This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 17ae38b7356 branch-3.0: [opt](recycler) Improve robustness and 
observability  #45617 (#45856)
17ae38b7356 is described below

commit 17ae38b73565921c5aa7b2ff32bc4b8f2ce3dbda
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Dec 25 09:37:19 2024 +0800

    branch-3.0: [opt](recycler) Improve robustness and observability  #45617 
(#45856)
    
    Cherry-picked from #45617
    
    Co-authored-by: Gavin Chou <ga...@selectdb.com>
---
 cloud/src/common/bvars.cpp              | 160 ++++++---------
 cloud/src/common/bvars.h                |  96 ++++-----
 cloud/src/common/config.h               |   2 +
 cloud/src/meta-service/meta_service.cpp |   1 +
 cloud/src/recycler/obj_storage_client.h |  10 +-
 cloud/src/recycler/recycler.cpp         | 341 +++++++++++++++++++++-----------
 cloud/src/recycler/recycler.h           |  20 +-
 cloud/src/recycler/s3_accessor.cpp      |  42 +++-
 cloud/src/recycler/s3_obj_client.cpp    |   7 +-
 9 files changed, 398 insertions(+), 281 deletions(-)

diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index 746f109ac6d..a0b0a2da9c2 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -20,6 +20,8 @@
 #include <cstdint>
 #include <stdexcept>
 
+// clang-format off
+
 // meta-service's bvars
 BvarLatencyRecorderWithTag g_bvar_ms_begin_txn("ms", "begin_txn");
 BvarLatencyRecorderWithTag g_bvar_ms_precommit_txn("ms", "precommit_txn");
@@ -71,23 +73,27 @@ BvarLatencyRecorderWithTag g_bvar_ms_get_copy_files("ms", 
"get_copy_files");
 BvarLatencyRecorderWithTag g_bvar_ms_filter_copy_files("ms", 
"filter_copy_files");
 BvarLatencyRecorderWithTag g_bvar_ms_update_delete_bitmap("ms", 
"update_delete_bitmap");
 BvarLatencyRecorderWithTag g_bvar_ms_get_delete_bitmap("ms", 
"get_delete_bitmap");
-BvarLatencyRecorderWithTag g_bvar_ms_get_delete_bitmap_update_lock("ms",
-                                                                   
"get_delete_bitmap_update_lock");
+BvarLatencyRecorderWithTag g_bvar_ms_get_delete_bitmap_update_lock("ms", 
"get_delete_bitmap_update_lock");
 BvarLatencyRecorderWithTag g_bvar_ms_remove_delete_bitmap("ms", 
"remove_delete_bitmap");
-BvarLatencyRecorderWithTag g_bvar_ms_remove_delete_bitmap_update_lock(
-        "ms", "remove_delete_bitmap_update_lock");
+BvarLatencyRecorderWithTag g_bvar_ms_remove_delete_bitmap_update_lock("ms", 
"remove_delete_bitmap_update_lock");
 BvarLatencyRecorderWithTag g_bvar_ms_get_instance("ms", "get_instance");
 BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach("ms", 
"get_rl_task_commit_attach");
 BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress("ms", 
"reset_rl_progress");
 BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id("ms", "get_txn_id");
-
 BvarLatencyRecorderWithTag g_bvar_ms_start_tablet_job("ms", 
"start_tablet_job");
 BvarLatencyRecorderWithTag g_bvar_ms_finish_tablet_job("ms", 
"finish_tablet_job");
 BvarLatencyRecorderWithTag g_bvar_ms_get_cluster_status("ms", 
"get_cluster_status");
 BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status("ms", 
"set_cluster_status");
-
 BvarLatencyRecorderWithTag g_bvar_ms_check_kv("ms", "check_kv");
 
+// recycler's bvars
+// TODO: use mbvar for per instance, 
https://github.com/apache/brpc/blob/master/docs/cn/mbvar_c++.md
+BvarStatusWithTag<int64_t> 
g_bvar_recycler_recycle_index_earlest_ts("recycler", 
"recycle_index_earlest_ts");
+BvarStatusWithTag<int64_t> 
g_bvar_recycler_recycle_partition_earlest_ts("recycler", 
"recycle_partition_earlest_ts");
+BvarStatusWithTag<int64_t> 
g_bvar_recycler_recycle_rowset_earlest_ts("recycler", 
"recycle_rowset_earlest_ts");
+BvarStatusWithTag<int64_t> 
g_bvar_recycler_recycle_tmp_rowset_earlest_ts("recycler", 
"recycle_tmp_rowset_earlest_ts");
+BvarStatusWithTag<int64_t> 
g_bvar_recycler_recycle_expired_txn_label_earlest_ts("recycler", 
"recycle_expired_txn_label_earlest_ts");
+
 // txn_kv's bvars
 bvar::LatencyRecorder g_bvar_txn_kv_get("txn_kv", "get");
 bvar::LatencyRecorder g_bvar_txn_kv_range_get("txn_kv", "range_get");
@@ -101,107 +107,65 @@ bvar::LatencyRecorder 
g_bvar_txn_kv_range_remove("txn_kv", "range_remove");
 bvar::LatencyRecorder g_bvar_txn_kv_get_read_version("txn_kv", 
"get_read_version");
 bvar::LatencyRecorder g_bvar_txn_kv_get_committed_version("txn_kv", 
"get_committed_version");
 bvar::LatencyRecorder g_bvar_txn_kv_batch_get("txn_kv", "batch_get");
-
 bvar::Adder<int64_t> g_bvar_txn_kv_get_count_normalized("txn_kv", 
"get_count_normalized");
-
 bvar::Adder<int64_t> g_bvar_txn_kv_commit_error_counter;
-bvar::Window<bvar::Adder<int64_t> > g_bvar_txn_kv_commit_error_counter_minute(
-        "txn_kv", "commit_error", &g_bvar_txn_kv_commit_error_counter, 60);
-
+bvar::Window<bvar::Adder<int64_t> > 
g_bvar_txn_kv_commit_error_counter_minute("txn_kv", "commit_error", 
&g_bvar_txn_kv_commit_error_counter, 60);
 bvar::Adder<int64_t> g_bvar_txn_kv_commit_conflict_counter;
-bvar::Window<bvar::Adder<int64_t> > 
g_bvar_txn_kv_commit_conflict_counter_minute(
-        "txn_kv", "commit_conflict", &g_bvar_txn_kv_commit_conflict_counter, 
60);
+bvar::Window<bvar::Adder<int64_t> > 
g_bvar_txn_kv_commit_conflict_counter_minute("txn_kv", "commit_conflict", 
&g_bvar_txn_kv_commit_conflict_counter, 60);
 
+// fdb's bvars
 const int64_t BVAR_FDB_INVALID_VALUE = -99999999L;
 bvar::Status<int64_t> g_bvar_fdb_client_count("fdb_client_count", 
BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_configuration_coordinators_count(
-        "fdb_configuration_coordinators_count", BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> 
g_bvar_fdb_configuration_usable_regions("fdb_configuration_usable_regions",
-                                                              
BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_coordinators_unreachable_count(
-        "fdb_coordinators_unreachable_count", BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> 
g_bvar_fdb_fault_tolerance_count("fdb_fault_tolerance_count",
-                                                       BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_data_average_partition_size_bytes(
-        "fdb_data_average_partition_size_bytes", BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> 
g_bvar_fdb_data_log_server_space_bytes("fdb_data_log_server_space_bytes",
-                                                             
BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_data_moving_data_highest_priority(
-        "fdb_data_moving_data_highest_priority", BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_data_moving_data_in_flight_bytes(
-        "fdb_data_moving_data_in_flight_bytes", BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_data_moving_data_in_queue_bytes(
-        "fdb_data_moving_data_in_queue_bytes", BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_data_moving_total_written_bytes(
-        "fdb_data_moving_total_written_bytes", BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> 
g_bvar_fdb_data_partition_count("fdb_data_partition_count",
-                                                      BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_data_storage_server_space_bytes(
-        "fdb_data_storage_server_space_bytes", BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_data_state_min_replicas_remaining(
-        "fdb_data_state_min_replicas_remaining", BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> 
g_bvar_fdb_data_total_kv_size_bytes("fdb_data_total_kv_size_bytes",
-                                                          
BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> 
g_bvar_fdb_data_total_disk_used_bytes("fdb_data_total_disk_used_bytes",
-                                                            
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_configuration_coordinators_count("fdb_configuration_coordinators_count",
 BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_configuration_usable_regions("fdb_configuration_usable_regions", 
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_coordinators_unreachable_count("fdb_coordinators_unreachable_count", 
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_fault_tolerance_count("fdb_fault_tolerance_count", 
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_data_average_partition_size_bytes("fdb_data_average_partition_size_bytes",
 BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_data_log_server_space_bytes("fdb_data_log_server_space_bytes", 
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_data_moving_data_highest_priority("fdb_data_moving_data_highest_priority",
 BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_data_moving_data_in_flight_bytes("fdb_data_moving_data_in_flight_bytes",
 BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_data_moving_data_in_queue_bytes("fdb_data_moving_data_in_queue_bytes",
 BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_data_moving_total_written_bytes("fdb_data_moving_total_written_bytes",
 BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_data_partition_count("fdb_data_partition_count", 
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_data_storage_server_space_bytes("fdb_data_storage_server_space_bytes",
 BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_data_state_min_replicas_remaining("fdb_data_state_min_replicas_remaining",
 BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_data_total_kv_size_bytes("fdb_data_total_kv_size_bytes", 
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_data_total_disk_used_bytes("fdb_data_total_disk_used_bytes", 
BVAR_FDB_INVALID_VALUE);
 bvar::Status<int64_t> g_bvar_fdb_generation("fdb_generation", 
BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> 
g_bvar_fdb_incompatible_connections("fdb_incompatible_connections",
-                                                          
BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_latency_probe_transaction_start_ns(
-        "fdb_latency_probe_transaction_start_ns", BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> 
g_bvar_fdb_latency_probe_commit_ns("fdb_latency_probe_commit_ns",
-                                                         
BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> 
g_bvar_fdb_latency_probe_read_ns("fdb_latency_probe_read_ns",
-                                                       BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_incompatible_connections("fdb_incompatible_connections", 
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_latency_probe_transaction_start_ns("fdb_latency_probe_transaction_start_ns",
 BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_latency_probe_commit_ns("fdb_latency_probe_commit_ns", 
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_latency_probe_read_ns("fdb_latency_probe_read_ns", 
BVAR_FDB_INVALID_VALUE);
 bvar::Status<int64_t> g_bvar_fdb_machines_count("fdb_machines_count", 
BVAR_FDB_INVALID_VALUE);
 bvar::Status<int64_t> g_bvar_fdb_process_count("fdb_process_count", 
BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_qos_worst_data_lag_storage_server_ns(
-        "fdb_qos_worst_data_lag_storage_server_ns", BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_qos_worst_durability_lag_storage_server_ns(
-        "fdb_qos_worst_durability_lag_storage_server_ns", 
BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_qos_worst_log_server_queue_bytes(
-        "fdb_qos_worst_log_server_queue_bytes", BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_qos_worst_storage_server_queue_bytes(
-        "fdb_qos_worst_storage_server_queue_bytes", BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> 
g_bvar_fdb_workload_conflict_rate_hz("fdb_workload_conflict_rate_hz",
-                                                           
BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> 
g_bvar_fdb_workload_location_rate_hz("fdb_workload_location_rate_hz",
-                                                           
BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> 
g_bvar_fdb_workload_keys_read_hz("fdb_workload_keys_read_hz",
-                                                       BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> 
g_bvar_fdb_workload_read_bytes_hz("fdb_workload_read_bytes_hz",
-                                                        
BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> 
g_bvar_fdb_workload_read_rate_hz("fdb_workload_read_rate_hz",
-                                                       BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> 
g_bvar_fdb_workload_write_rate_hz("fdb_workload_write_rate_hz",
-                                                        
BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> 
g_bvar_fdb_workload_written_bytes_hz("fdb_workload_written_bytes_hz",
-                                                           
BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_workload_transactions_started_hz(
-        "fdb_workload_transactions_started_hz", BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_workload_transactions_committed_hz(
-        "fdb_workload_transactions_committed_hz", BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_workload_transactions_rejected_hz(
-        "fdb_workload_transactions_rejected_hz", BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_client_thread_busyness_percent(
-        "fdb_client_thread_busyness_percent", BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_qos_worst_data_lag_storage_server_ns("fdb_qos_worst_data_lag_storage_server_ns",
 BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_qos_worst_durability_lag_storage_server_ns("fdb_qos_worst_durability_lag_storage_server_ns",
 BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_qos_worst_log_server_queue_bytes("fdb_qos_worst_log_server_queue_bytes",
 BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_qos_worst_storage_server_queue_bytes("fdb_qos_worst_storage_server_queue_bytes",
 BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_workload_conflict_rate_hz("fdb_workload_conflict_rate_hz", 
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_workload_location_rate_hz("fdb_workload_location_rate_hz", 
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_workload_keys_read_hz("fdb_workload_keys_read_hz", 
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_workload_read_bytes_hz("fdb_workload_read_bytes_hz", 
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_workload_read_rate_hz("fdb_workload_read_rate_hz", 
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_workload_write_rate_hz("fdb_workload_write_rate_hz", 
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_workload_written_bytes_hz("fdb_workload_written_bytes_hz", 
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_workload_transactions_started_hz("fdb_workload_transactions_started_hz",
 BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_workload_transactions_committed_hz("fdb_workload_transactions_committed_hz",
 BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_workload_transactions_rejected_hz("fdb_workload_transactions_rejected_hz",
 BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t> 
g_bvar_fdb_client_thread_busyness_percent("fdb_client_thread_busyness_percent", 
BVAR_FDB_INVALID_VALUE);
 
 // checker's bvars
-BvarStatusWithTag<long> g_bvar_checker_num_scanned("checker", "num_scanned");
-BvarStatusWithTag<long> g_bvar_checker_num_scanned_with_segment("checker",
-                                                                
"num_scanned_with_segment");
-BvarStatusWithTag<long> g_bvar_checker_num_check_failed("checker", 
"num_check_failed");
-BvarStatusWithTag<long> g_bvar_checker_check_cost_s("checker", 
"check_cost_seconds");
-BvarStatusWithTag<long> g_bvar_checker_enqueue_cost_s("checker", 
"enqueue_cost_seconds");
-BvarStatusWithTag<long> g_bvar_checker_last_success_time_ms("checker", 
"last_success_time_ms");
-BvarStatusWithTag<long> g_bvar_checker_instance_volume("checker", 
"instance_volume");
-BvarStatusWithTag<long> g_bvar_inverted_checker_num_scanned("checker", 
"num_inverted_scanned");
-BvarStatusWithTag<long> g_bvar_inverted_checker_num_check_failed("checker",
-                                                                 
"num_inverted_check_failed");
+BvarStatusWithTag<int64_t> g_bvar_checker_num_scanned("checker", 
"num_scanned");
+BvarStatusWithTag<int64_t> g_bvar_checker_num_scanned_with_segment("checker", 
"num_scanned_with_segment");
+BvarStatusWithTag<int64_t> g_bvar_checker_num_check_failed("checker", 
"num_check_failed");
+BvarStatusWithTag<int64_t> g_bvar_checker_check_cost_s("checker", 
"check_cost_seconds");
+BvarStatusWithTag<int64_t> g_bvar_checker_enqueue_cost_s("checker", 
"enqueue_cost_seconds");
+BvarStatusWithTag<int64_t> g_bvar_checker_last_success_time_ms("checker", 
"last_success_time_ms");
+BvarStatusWithTag<int64_t> g_bvar_checker_instance_volume("checker", 
"instance_volume");
+BvarStatusWithTag<int64_t> g_bvar_inverted_checker_num_scanned("checker", 
"num_inverted_scanned");
+BvarStatusWithTag<int64_t> g_bvar_inverted_checker_num_check_failed("checker", 
"num_inverted_check_failed");
+BvarStatusWithTag<int64_t> 
g_bvar_inverted_checker_leaked_delete_bitmaps("checker", 
"leaked_delete_bitmaps");
+BvarStatusWithTag<int64_t> 
g_bvar_inverted_checker_abnormal_delete_bitmaps("checker", 
"abnormal_delete_bitmaps");
+BvarStatusWithTag<int64_t> 
g_bvar_inverted_checker_delete_bitmaps_scanned("checker", 
"delete_bitmap_keys_scanned");
 
-BvarStatusWithTag<int64_t> 
g_bvar_inverted_checker_leaked_delete_bitmaps("checker",
-                                                                         
"leaked_delete_bitmaps");
-BvarStatusWithTag<int64_t> g_bvar_inverted_checker_abnormal_delete_bitmaps(
-        "checker", "abnormal_delete_bitmaps");
-BvarStatusWithTag<int64_t> g_bvar_inverted_checker_delete_bitmaps_scanned(
-        "checker", "delete_bitmap_keys_scanned");
\ No newline at end of file
+// clang-format on
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index d0ad2e97957..93340a6c0d2 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -26,34 +26,53 @@
 #include <memory>
 #include <mutex>
 #include <string>
-
-class BvarLatencyRecorderWithTag {
+#include <type_traits>
+
+/**
+ * Manage bvars that with similar names (identical prefix)
+ * ${module}_${name}_${tag}
+ * where `tag` is added automatically when calling `get` or `put`
+ */
+template <typename Bvar, bool is_status = false>
+class BvarWithTag {
 public:
-    BvarLatencyRecorderWithTag(std::string module, std::string name)
+    BvarWithTag(std::string module, std::string name)
             : module_(std::move(module)), name_(std::move(name)) {}
 
-    void put(const std::string& tag, int64_t value) {
-        std::shared_ptr<bvar::LatencyRecorder> instance = nullptr;
+    template <typename ValType>
+        requires std::is_integral_v<ValType>
+    void put(const std::string& tag, ValType value) {
+        std::shared_ptr<Bvar> instance = nullptr;
         {
             std::lock_guard<bthread::Mutex> l(mutex_);
             auto it = bvar_map_.find(tag);
             if (it == bvar_map_.end()) {
-                instance = std::make_shared<bvar::LatencyRecorder>(module_, 
name_ + "_" + tag);
+                instance = std::make_shared<Bvar>(module_, name_ + "_" + tag, 
ValType());
                 bvar_map_[tag] = instance;
             } else {
                 instance = it->second;
             }
         }
-        (*instance) << value;
+        // FIXME(gavin): check bvar::Adder and more
+        if constexpr (std::is_same_v<Bvar, bvar::LatencyRecorder>) {
+            (*instance) << value;
+        } else if constexpr (is_status) {
+            instance->set_value(value);
+        } else {
+            // This branch mean to be unreachable, add an assert(false) here to
+            // prevent missing branch match.
+            // Postpone deduction of static_assert by evaluating sizeof(T)
+            static_assert(!sizeof(Bvar), "all types must be matched with if 
constexpr");
+        }
     }
 
-    std::shared_ptr<bvar::LatencyRecorder> get(const std::string& tag) {
-        std::shared_ptr<bvar::LatencyRecorder> instance = nullptr;
+    std::shared_ptr<Bvar> get(const std::string& tag) {
+        std::shared_ptr<Bvar> instance = nullptr;
         std::lock_guard<bthread::Mutex> l(mutex_);
 
         auto it = bvar_map_.find(tag);
         if (it == bvar_map_.end()) {
-            instance = std::make_shared<bvar::LatencyRecorder>(module_, name_ 
+ "_" + tag);
+            instance = std::make_shared<Bvar>(module_, name_ + "_" + tag);
             bvar_map_[tag] = instance;
             return instance;
         }
@@ -69,54 +88,14 @@ private:
     bthread::Mutex mutex_;
     std::string module_;
     std::string name_;
-    std::map<std::string, std::shared_ptr<bvar::LatencyRecorder>> bvar_map_;
+    std::map<std::string, std::shared_ptr<Bvar>> bvar_map_;
 };
 
-template <class T>
-class BvarStatusWithTag {
-public:
-    BvarStatusWithTag(std::string module, std::string name)
-            : module_(std::move(module)), name_(std::move(name)) {}
-
-    void put(const std::string& tag, T value) {
-        std::shared_ptr<bvar::Status<T>> instance = nullptr;
-        {
-            std::lock_guard<bthread::Mutex> l(mutex_);
-            auto it = bvar_map_.find(tag);
-            if (it == bvar_map_.end()) {
-                instance = std::make_shared<bvar::Status<T>>(module_, name_ + 
"_" + tag, T());
-                bvar_map_[tag] = instance;
-            } else {
-                instance = it->second;
-            }
-        }
-        (*instance).set_value(value);
-    }
-
-    std::shared_ptr<bvar::Status<T>> get(const std::string& tag) {
-        std::shared_ptr<bvar::Status<T>> instance = nullptr;
-        std::lock_guard<bthread::Mutex> l(mutex_);
-
-        auto it = bvar_map_.find(tag);
-        if (it == bvar_map_.end()) {
-            instance = std::make_shared<bvar::Status<T>>(module_, name_ + "_" 
+ tag);
-            bvar_map_[tag] = instance;
-            return instance;
-        }
-        return it->second;
-    }
-
-    void remove(const std::string& tag) {
-        std::lock_guard<bthread::Mutex> l(mutex_);
-        bvar_map_.erase(tag);
-    }
+using BvarLatencyRecorderWithTag = BvarWithTag<bvar::LatencyRecorder>;
 
-private:
-    bthread::Mutex mutex_;
-    std::string module_;
-    std::string name_;
-    std::map<std::string, std::shared_ptr<bvar::Status<T>>> bvar_map_;
-};
+template <typename T>
+    requires std::is_integral_v<T>
+using BvarStatusWithTag = BvarWithTag<bvar::Status<T>, true>;
 
 // meta-service's bvars
 extern BvarLatencyRecorderWithTag g_bvar_ms_begin_txn;
@@ -182,6 +161,13 @@ extern BvarLatencyRecorderWithTag 
g_bvar_ms_reset_rl_progress;
 extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id;
 extern BvarLatencyRecorderWithTag g_bvar_ms_check_kv;
 
+// recycler's bvars
+extern BvarStatusWithTag<int64_t> g_bvar_recycler_recycle_index_earlest_ts;
+extern BvarStatusWithTag<int64_t> g_bvar_recycler_recycle_partition_earlest_ts;
+extern BvarStatusWithTag<int64_t> g_bvar_recycler_recycle_rowset_earlest_ts;
+extern BvarStatusWithTag<int64_t> 
g_bvar_recycler_recycle_tmp_rowset_earlest_ts;
+extern BvarStatusWithTag<int64_t> 
g_bvar_recycler_recycle_expired_txn_label_earlest_ts;
+
 // txn_kv's bvars
 extern bvar::LatencyRecorder g_bvar_txn_kv_get;
 extern bvar::LatencyRecorder g_bvar_txn_kv_range_get;
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index d2ee284d595..ec8b1ce9903 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -66,9 +66,11 @@ CONF_mInt64(dropped_partition_retention_seconds, "10800"); 
// 3h
 CONF_Strings(recycle_whitelist, ""); // Comma seprated list
 // These instances will not be recycled, only effective when whitelist is 
empty.
 CONF_Strings(recycle_blacklist, ""); // Comma seprated list
+// IO worker thread pool concurrency: object list, delete
 CONF_mInt32(instance_recycler_worker_pool_size, "32");
 CONF_Bool(enable_checker, "false");
 // The parallelism for parallel recycle operation
+// s3_producer_pool recycle_tablet_pool, delete single object in this pool
 CONF_Int32(recycle_pool_parallelism, "40");
 // Currently only used for recycler test
 CONF_Bool(enable_inverted_check, "false");
diff --git a/cloud/src/meta-service/meta_service.cpp 
b/cloud/src/meta-service/meta_service.cpp
index 18a1eefce8f..9af2b785904 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -1183,6 +1183,7 @@ void 
MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle
                          << ", rowset_id=" << rowset_id
                          << ", rowset_meta_bytes=" << 
rowset_meta.ByteSizeLong()
                          << ", segment_key_bounds_bytes=" << 
segment_key_bounds_bytes
+                         << ", num_segments=" << rowset_meta.num_segments()
                          << ", rowset_meta=" << rowset_meta.ShortDebugString();
         }
         code = cast_as<ErrCategory::COMMIT>(err);
diff --git a/cloud/src/recycler/obj_storage_client.h 
b/cloud/src/recycler/obj_storage_client.h
index fc0211820d1..b3d5cd4978e 100644
--- a/cloud/src/recycler/obj_storage_client.h
+++ b/cloud/src/recycler/obj_storage_client.h
@@ -30,9 +30,15 @@ struct ObjectStoragePathRef {
 };
 
 struct ObjectStorageResponse {
-    ObjectStorageResponse(int r = 0, std::string msg = "") : ret(r), 
error_msg(std::move(msg)) {}
+    enum Code : int {
+        UNDEFINED = -1,
+        OK = 0,
+        NOT_FOUND = 1,
+    };
+
+    ObjectStorageResponse(int r = OK, std::string msg = "") : ret(r), 
error_msg(std::move(msg)) {}
     // clang-format off
-    int ret {0}; // To unify the error handle logic with BE, we'd better use 
the same error code as BE
+    int ret {OK}; // To unify the error handle logic with BE, we'd better use 
the same error code as BE
     // clang-format on
     std::string error_msg;
 };
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index ca22b28e031..84d755958ee 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -40,6 +40,7 @@
 #ifdef UNIT_TEST
 #include "../test/mock_accessor.h"
 #endif
+#include "common/bvars.h"
 #include "common/config.h"
 #include "common/encryption_util.h"
 #include "common/logging.h"
@@ -576,11 +577,11 @@ int InstanceRecycler::init() {
 template <typename... Func>
 auto task_wrapper(Func... funcs) -> std::function<int()> {
     return [funcs...]() {
-        return [](std::initializer_list<int> numbers) {
+        return [](std::initializer_list<int> ret_vals) {
             int i = 0;
-            for (int num : numbers) {
-                if (num != 0) {
-                    i = num;
+            for (int ret : ret_vals) {
+                if (ret != 0) {
+                    i = ret;
                 }
             }
             return i;
@@ -597,11 +598,15 @@ int InstanceRecycler::do_recycle() {
                                         fmt::format("instance id {}", 
instance_id_),
                                         [](int r) { return r != 0; });
         sync_executor
-                .add(task_wrapper(
+                .add(task_wrapper( // dropped table and dropped partition need 
to be recycled in series
+                                   // becase they may both recycle the same 
set of tablets
+                        // recycle dropped table or idexes(mv, rollup)
                         [this]() -> int { return 
InstanceRecycler::recycle_indexes(); },
-                        [this]() -> int { return 
InstanceRecycler::recycle_partitions(); },
-                        [this]() -> int { return 
InstanceRecycler::recycle_tmp_rowsets(); },
-                        [this]() -> int { return 
InstanceRecycler::recycle_rowsets(); }))
+                        // recycle dropped partitions
+                        [this]() -> int { return 
InstanceRecycler::recycle_partitions(); }))
+                .add(task_wrapper(
+                        [this]() -> int { return 
InstanceRecycler::recycle_tmp_rowsets(); }))
+                .add(task_wrapper([this]() -> int { return 
InstanceRecycler::recycle_rowsets(); }))
                 .add(task_wrapper(
                         [this]() { return 
InstanceRecycler::abort_timeout_txn(); },
                         [this]() { return 
InstanceRecycler::recycle_expired_txn_label(); }))
@@ -625,6 +630,11 @@ int InstanceRecycler::do_recycle() {
     }
 }
 
+/**
+ * 1. delete all remote data
+ * 2. delete all kv
+ * 3. remove instance kv
+ */
 int InstanceRecycler::recycle_deleted_instance() {
     LOG_INFO("begin to recycle deleted instance").tag("instance_id", 
instance_id_);
 
@@ -638,6 +648,29 @@ int InstanceRecycler::recycle_deleted_instance() {
                   << "s, instance_id=" << instance_id_;
     });
 
+    // delete all remote data
+    for (auto& [_, accessor] : accessor_map_) {
+        if (stopped()) {
+            return ret;
+        }
+
+        LOG(INFO) << "begin to delete all objects in " << accessor->uri();
+        int del_ret = accessor->delete_all();
+        if (del_ret == 0) {
+            LOG(INFO) << "successfully delete all objects in " << 
accessor->uri();
+        } else if (del_ret != 1) { // no need to log, because S3Accessor has 
logged this error
+            // If `del_ret == 1`, it can be considered that the object data 
has been recycled by cloud platform,
+            // so the recycling has been successful.
+            ret = -1;
+        }
+    }
+
+    if (ret != 0) {
+        LOG(WARNING) << "failed to delete all data of deleted instance=" << 
instance_id_;
+        return ret;
+    }
+
+    // delete all kv
     std::unique_ptr<Transaction> txn;
     TxnErrorCode err = txn_kv_->create_txn(&txn);
     if (err != TxnErrorCode::TXN_OK) {
@@ -681,22 +714,6 @@ int InstanceRecycler::recycle_deleted_instance() {
         ret = -1;
     }
 
-    for (auto& [_, accessor] : accessor_map_) {
-        if (stopped()) {
-            return ret;
-        }
-
-        LOG(INFO) << "begin to delete all objects in " << accessor->uri();
-        int del_ret = accessor->delete_all();
-        if (del_ret == 0) {
-            LOG(INFO) << "successfully delete all objects in " << 
accessor->uri();
-        } else if (del_ret != 1) { // no need to log, because S3Accessor has 
logged this error
-            // If `del_ret == 1`, it can be considered that the object data 
has been recycled by cloud platform,
-            // so the recycling has been successful.
-            ret = -1;
-        }
-    }
-
     if (ret == 0) {
         // remove instance kv
         // ATTN: MUST ensure that cloud platform won't regenerate the same 
instance id
@@ -721,9 +738,9 @@ int InstanceRecycler::recycle_deleted_instance() {
 
 int InstanceRecycler::recycle_indexes() {
     const std::string task_name = "recycle_indexes";
-    int num_scanned = 0;
-    int num_expired = 0;
-    int num_recycled = 0;
+    int64_t num_scanned = 0;
+    int64_t num_expired = 0;
+    int64_t num_recycled = 0;
 
     RecycleIndexKeyInfo index_key_info0 {instance_id_, 0};
     RecycleIndexKeyInfo index_key_info1 {instance_id_, INT64_MAX};
@@ -748,9 +765,11 @@ int InstanceRecycler::recycle_indexes() {
                 .tag("num_recycled", num_recycled);
     });
 
-    auto calc_expiration = [](const RecycleIndexPB& index) -> int64_t {
+    int64_t earlest_ts = std::numeric_limits<int64_t>::max();
+
+    auto calc_expiration = [&earlest_ts, this](const RecycleIndexPB& index) {
         if (config::force_immediate_recycle) {
-            return 0;
+            return 0L;
         }
         int64_t expiration = index.expiration() > 0 ? index.expiration() : 
index.creation_time();
         int64_t retention_seconds = config::retention_seconds;
@@ -758,7 +777,12 @@ int InstanceRecycler::recycle_indexes() {
             retention_seconds =
                     std::min(config::dropped_index_retention_seconds, 
retention_seconds);
         }
-        return expiration + retention_seconds;
+        int64_t final_expiration = expiration + retention_seconds;
+        if (earlest_ts > final_expiration) {
+            earlest_ts = final_expiration;
+            g_bvar_recycler_recycle_index_earlest_ts.put(instance_id_, 
earlest_ts);
+        }
+        return final_expiration;
     };
 
     // Elements in `index_keys` has the same lifetime as `it` in 
`scan_and_recycle`
@@ -919,9 +943,9 @@ bool check_lazy_txn_finished(std::shared_ptr<TxnKv> txn_kv, 
const std::string in
 
 int InstanceRecycler::recycle_partitions() {
     const std::string task_name = "recycle_partitions";
-    int num_scanned = 0;
-    int num_expired = 0;
-    int num_recycled = 0;
+    int64_t num_scanned = 0;
+    int64_t num_expired = 0;
+    int64_t num_recycled = 0;
 
     RecyclePartKeyInfo part_key_info0 {instance_id_, 0};
     RecyclePartKeyInfo part_key_info1 {instance_id_, INT64_MAX};
@@ -946,9 +970,11 @@ int InstanceRecycler::recycle_partitions() {
                 .tag("num_recycled", num_recycled);
     });
 
-    auto calc_expiration = [](const RecyclePartitionPB& partition) -> int64_t {
+    int64_t earlest_ts = std::numeric_limits<int64_t>::max();
+
+    auto calc_expiration = [&earlest_ts, this](const RecyclePartitionPB& 
partition) {
         if (config::force_immediate_recycle) {
-            return 0;
+            return 0L;
         }
         int64_t expiration =
                 partition.expiration() > 0 ? partition.expiration() : 
partition.creation_time();
@@ -957,7 +983,12 @@ int InstanceRecycler::recycle_partitions() {
             retention_seconds =
                     std::min(config::dropped_partition_retention_seconds, 
retention_seconds);
         }
-        return expiration + retention_seconds;
+        int64_t final_expiration = expiration + retention_seconds;
+        if (earlest_ts > final_expiration) {
+            earlest_ts = final_expiration;
+            g_bvar_recycler_recycle_partition_earlest_ts.put(instance_id_, 
earlest_ts);
+        }
+        return final_expiration;
     };
 
     // Elements in `partition_keys` has the same lifetime as `it` in 
`scan_and_recycle`
@@ -1074,8 +1105,8 @@ int InstanceRecycler::recycle_partitions() {
 }
 
 int InstanceRecycler::recycle_versions() {
-    int num_scanned = 0;
-    int num_recycled = 0;
+    int64_t num_scanned = 0;
+    int64_t num_recycled = 0;
 
     LOG_INFO("begin to recycle table and partition 
versions").tag("instance_id", instance_id_);
 
@@ -1152,13 +1183,14 @@ int InstanceRecycler::recycle_versions() {
 
 int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id, 
int64_t partition_id,
                                       bool is_empty_tablet) {
-    int num_scanned = 0;
-    std::atomic_int num_recycled = 0;
+    int64_t num_scanned = 0;
+    std::atomic_long num_recycled = 0;
 
     std::string tablet_key_begin, tablet_key_end;
     std::string stats_key_begin, stats_key_end;
     std::string job_key_begin, job_key_end;
 
+    std::string tablet_belongs;
     if (partition_id > 0) {
         // recycle tablets in a partition belonging to the index
         meta_tablet_key({instance_id_, table_id, index_id, partition_id, 0}, 
&tablet_key_begin);
@@ -1167,6 +1199,7 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, 
int64_t index_id, int64_
         stats_tablet_key({instance_id_, table_id, index_id, partition_id + 1, 
0}, &stats_key_end);
         job_tablet_key({instance_id_, table_id, index_id, partition_id, 0}, 
&job_key_begin);
         job_tablet_key({instance_id_, table_id, index_id, partition_id + 1, 
0}, &job_key_end);
+        tablet_belongs = "partition";
     } else {
         // recycle tablets in the index
         meta_tablet_key({instance_id_, table_id, index_id, 0, 0}, 
&tablet_key_begin);
@@ -1175,9 +1208,10 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, 
int64_t index_id, int64_
         stats_tablet_key({instance_id_, table_id, index_id + 1, 0, 0}, 
&stats_key_end);
         job_tablet_key({instance_id_, table_id, index_id, 0, 0}, 
&job_key_begin);
         job_tablet_key({instance_id_, table_id, index_id + 1, 0, 0}, 
&job_key_end);
+        tablet_belongs = "index";
     }
 
-    LOG_INFO("begin to recycle tablets")
+    LOG_INFO("begin to recycle tablets of the " + tablet_belongs)
             .tag("table_id", table_id)
             .tag("index_id", index_id)
             .tag("partition_id", partition_id);
@@ -1186,7 +1220,7 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, 
int64_t index_id, int64_
 
     std::unique_ptr<int, std::function<void(int*)>> 
defer_log_statistics((int*)0x01, [&](int*) {
         auto cost = duration<float>(steady_clock::now() - start_time).count();
-        LOG_INFO("recycle tablets finished, cost={}s", cost)
+        LOG_INFO("recycle tablets of " + tablet_belongs + " finished, 
cost={}s", cost)
                 .tag("instance_id", instance_id_)
                 .tag("table_id", table_id)
                 .tag("index_id", index_id)
@@ -1612,12 +1646,15 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) 
{
 
 int InstanceRecycler::recycle_rowsets() {
     const std::string task_name = "recycle_rowsets";
-    int num_scanned = 0;
-    int num_expired = 0;
-    int num_prepare = 0;
-    size_t total_rowset_size = 0;
+    int64_t num_scanned = 0;
+    int64_t num_expired = 0;
+    int64_t num_prepare = 0;
+    int64_t num_compacted = 0;
+    int64_t num_empty_rowset = 0;
+    size_t total_rowset_key_size = 0;
+    size_t total_rowset_value_size = 0;
     size_t expired_rowset_size = 0;
-    std::atomic_int num_recycled = 0;
+    std::atomic_long num_recycled = 0;
 
     RecycleRowsetKeyInfo recyc_rs_key_info0 {instance_id_, 0, ""};
     RecycleRowsetKeyInfo recyc_rs_key_info1 {instance_id_, INT64_MAX, ""};
@@ -1640,8 +1677,11 @@ int InstanceRecycler::recycle_rowsets() {
                 .tag("num_scanned", num_scanned)
                 .tag("num_expired", num_expired)
                 .tag("num_recycled", num_recycled)
-                .tag("num_prepare", num_prepare)
-                .tag("total_rowset_meta_size", total_rowset_size)
+                .tag("num_recycled.prepare", num_prepare)
+                .tag("num_recycled.compacted", num_compacted)
+                .tag("num_recycled.empty_rowset", num_empty_rowset)
+                .tag("total_rowset_meta_key_size_scanned", 
total_rowset_key_size)
+                .tag("total_rowset_meta_value_size_scanned", 
total_rowset_value_size)
                 .tag("expired_rowset_meta_size", expired_rowset_size);
     });
 
@@ -1692,9 +1732,11 @@ int InstanceRecycler::recycle_rowsets() {
         return 0;
     };
 
-    auto calc_expiration = [](const RecycleRowsetPB& rs) -> int64_t {
+    int64_t earlest_ts = std::numeric_limits<int64_t>::max();
+
+    auto calc_expiration = [&earlest_ts, this](const RecycleRowsetPB& rs) {
         if (config::force_immediate_recycle) {
-            return 0;
+            return 0L;
         }
         // RecycleRowsetPB created by compacted or dropped rowset has no 
expiration time, and will be recycled when exceed retention time
         int64_t expiration = rs.expiration() > 0 ? rs.expiration() : 
rs.creation_time();
@@ -1703,12 +1745,18 @@ int InstanceRecycler::recycle_rowsets() {
             retention_seconds =
                     std::min(config::compacted_rowset_retention_seconds, 
retention_seconds);
         }
-        return expiration + retention_seconds;
+        int64_t final_expiration = expiration + retention_seconds;
+        if (earlest_ts > final_expiration) {
+            earlest_ts = final_expiration;
+            g_bvar_recycler_recycle_rowset_earlest_ts.put(instance_id_, 
earlest_ts);
+        }
+        return final_expiration;
     };
 
     auto handle_rowset_kv = [&](std::string_view k, std::string_view v) -> int 
{
         ++num_scanned;
-        total_rowset_size += v.size();
+        total_rowset_key_size += k.size();
+        total_rowset_value_size += v.size();
         RecycleRowsetPB rowset;
         if (!rowset.ParseFromArray(v.data(), v.size())) {
             LOG_WARNING("malformed recycle rowset").tag("key", hex(k));
@@ -1780,9 +1828,12 @@ int InstanceRecycler::recycle_rowsets() {
                 return -1;
             }
         } else {
+            num_compacted += rowset.type() == RecycleRowsetPB::COMPACT;
             rowset_keys.emplace_back(k);
             if (rowset_meta->num_segments() > 0) { // Skip empty rowset
                 rowsets.push_back(std::move(*rowset_meta));
+            } else {
+                ++num_empty_rowset;
             }
         }
         return 0;
@@ -1823,8 +1874,7 @@ int InstanceRecycler::recycle_rowsets() {
     return ret;
 }
 
-bool check_txn_abort(std::shared_ptr<TxnKv> txn_kv, const std::string& 
instance_id,
-                     int64_t txn_id) {
+bool is_txn_aborted(std::shared_ptr<TxnKv> txn_kv, const std::string& 
instance_id, int64_t txn_id) {
     std::unique_ptr<Transaction> txn;
     TxnErrorCode err = txn_kv->create_txn(&txn);
     if (err != TxnErrorCode::TXN_OK) {
@@ -1883,11 +1933,12 @@ bool check_txn_abort(std::shared_ptr<TxnKv> txn_kv, 
const std::string& instance_
 
 int InstanceRecycler::recycle_tmp_rowsets() {
     const std::string task_name = "recycle_tmp_rowsets";
-    int num_scanned = 0;
-    int num_expired = 0;
-    int num_recycled = 0;
+    int64_t num_scanned = 0;
+    int64_t num_expired = 0;
+    int64_t num_recycled = 0;
     size_t expired_rowset_size = 0;
-    size_t total_rowset_size = 0;
+    size_t total_rowset_key_size = 0;
+    size_t total_rowset_value_size = 0;
 
     MetaRowsetTmpKeyInfo tmp_rs_key_info0 {instance_id_, 0, 0};
     MetaRowsetTmpKeyInfo tmp_rs_key_info1 {instance_id_, INT64_MAX, 0};
@@ -1910,41 +1961,54 @@ int InstanceRecycler::recycle_tmp_rowsets() {
                 .tag("num_scanned", num_scanned)
                 .tag("num_expired", num_expired)
                 .tag("num_recycled", num_recycled)
-                .tag("total_rowset_meta_size", total_rowset_size)
-                .tag("expired_rowset_meta_size", expired_rowset_size);
+                .tag("total_rowset_meta_key_size_scanned", 
total_rowset_key_size)
+                .tag("total_rowset_meta_value_size_scanned", 
total_rowset_value_size)
+                .tag("expired_rowset_meta_size_recycled", expired_rowset_size);
     });
 
     // Elements in `tmp_rowset_keys` has the same lifetime as `it`
     std::vector<std::string_view> tmp_rowset_keys;
     std::vector<doris::RowsetMetaCloudPB> tmp_rowsets;
 
+    int64_t earlest_ts = std::numeric_limits<int64_t>::max();
+    auto calc_expiration = [&earlest_ts, this](const doris::RowsetMetaCloudPB& 
rowset) {
+        // ATTN: `txn_expiration` should > 0, however we use `creation_time` + 
a large `retention_time` (> 1 day in production environment)
+        //  when `txn_expiration` <= 0 in some unexpected situation (usually 
when there are bugs). This is usually safe, coz loading
+        //  duration or timeout always < `retention_time` in practice.
+        int64_t expiration =
+                rowset.txn_expiration() > 0 ? rowset.txn_expiration() : 
rowset.creation_time();
+        expiration = config::force_immediate_recycle ? 0 : expiration;
+        int64_t final_expiration = expiration + config::retention_seconds;
+        if (earlest_ts > final_expiration) {
+            earlest_ts = final_expiration;
+            g_bvar_recycler_recycle_tmp_rowset_earlest_ts.put(instance_id_, 
earlest_ts);
+        }
+        return final_expiration;
+    };
+
     auto handle_rowset_kv = [&num_scanned, &num_expired, &tmp_rowset_keys, 
&tmp_rowsets,
-                             &expired_rowset_size, &total_rowset_size,
+                             &expired_rowset_size, &total_rowset_key_size, 
&total_rowset_value_size,
+                             &calc_expiration,
                              this](std::string_view k, std::string_view v) -> 
int {
         ++num_scanned;
-        total_rowset_size += v.size();
+        total_rowset_key_size += k.size();
+        total_rowset_value_size += v.size();
         doris::RowsetMetaCloudPB rowset;
         if (!rowset.ParseFromArray(v.data(), v.size())) {
             LOG_WARNING("malformed rowset meta").tag("key", hex(k));
             return -1;
         }
-        int64_t current_time = ::time(nullptr);
-        // ATTN: `txn_expiration` should > 0, however we use `creation_time` + 
a large `retention_time` (> 1 day in production environment)
-        //  when `txn_expiration` <= 0 in some unexpected situation (usually 
when there are bugs). This is usually safe, coz loading
-        //  duration or timeout always < `retention_time` in practice.
-        int64_t expiration = config::force_immediate_recycle ? 0
-                             : rowset.txn_expiration() > 0   ? 
rowset.txn_expiration()
-                                                             : 
rowset.creation_time();
+        int64_t expiration = calc_expiration(rowset);
         VLOG_DEBUG << "recycle tmp rowset scan, key=" << hex(k) << " 
num_scanned=" << num_scanned
                    << " num_expired=" << num_expired << " expiration=" << 
expiration
                    << " txn_expiration=" << rowset.txn_expiration()
                    << " rowset_creation_time=" << rowset.creation_time();
-        if (current_time < expiration + config::retention_seconds) {
-            // not expired
+        int64_t current_time = ::time(nullptr);
+        if (current_time < expiration) { // not expired
             return 0;
         }
 
-        if (!check_txn_abort(txn_kv_, instance_id_, rowset.txn_id())) {
+        if (!is_txn_aborted(txn_kv_, instance_id_, rowset.txn_id())) {
             return 0;
         }
 
@@ -1964,7 +2028,9 @@ int InstanceRecycler::recycle_tmp_rowsets() {
                   << " tablet_id=" << rowset.tablet_id() << " rowset_id=" << 
rowset.rowset_id_v2()
                   << " version=[" << rowset.start_version() << '-' << 
rowset.end_version()
                   << "] txn_id=" << rowset.txn_id() << " rowset_meta_size=" << 
v.size()
-                  << " creation_time" << rowset.creation_time();
+                  << " creation_time=" << rowset.creation_time() << " 
num_scanned=" << num_scanned
+                  << " num_expired=" << num_expired;
+
         tmp_rowset_keys.push_back(k);
         if (rowset.num_segments() > 0) { // Skip empty rowset
             tmp_rowsets.push_back(std::move(rowset));
@@ -1997,31 +2063,57 @@ int InstanceRecycler::scan_and_recycle(
         std::string begin, std::string_view end,
         std::function<int(std::string_view k, std::string_view v)> 
recycle_func,
         std::function<int()> loop_done) {
+    LOG(INFO) << "begin scan_and_recycle key_range=[" << hex(begin) << "," << 
hex(end) << ")";
     int ret = 0;
+    int64_t cnt = 0;
+    int get_range_retried = 0;
+    std::string err;
+    std::unique_ptr<int, std::function<void(int*)>> defer_log(
+            (int*)0x01, [begin, end, &err, &ret, &cnt, 
&get_range_retried](int*) {
+                LOG(INFO) << "finish scan_and_recycle key_range=[" << 
hex(begin) << "," << hex(end)
+                          << ") num_scanned=" << cnt << " get_range_retried=" 
<< get_range_retried
+                          << " ret=" << ret << " err=" << err;
+            });
+
     std::unique_ptr<RangeGetIterator> it;
     do {
-        int get_ret = txn_get(txn_kv_.get(), begin, end, it);
-        if (get_ret != 0) {
-            LOG(WARNING) << "failed to get kv, key=" << begin << " ret=" << 
get_ret;
+        if (get_range_retried > 1000) {
+            err = "txn_get exceeds max retry, may not scan all keys";
+            ret = -1;
             return -1;
         }
-        VLOG_DEBUG << "fetch " << it->size() << " kv";
+        int get_ret = txn_get(txn_kv_.get(), begin, end, it);
+        if (get_ret != 0) { // txn kv may complain "Request for future version"
+            LOG(WARNING) << "failed to get kv, range=[" << hex(begin) << "," 
<< hex(end)
+                         << ") num_scanned=" << cnt << " txn_get_ret=" << 
get_ret
+                         << " get_range_retried=" << get_range_retried;
+            ++get_range_retried;
+            std::this_thread::sleep_for(std::chrono::milliseconds(500));
+            continue; // try again
+        }
         if (!it->has_next()) {
-            VLOG_DEBUG << "no keys in the given range, begin=" << hex(begin) 
<< " end=" << hex(end);
-            break;
+            LOG(INFO) << "no keys in the given range=[" << hex(begin) << "," 
<< hex(end) << ")";
+            break; // scan finished
         }
         while (it->has_next()) {
+            ++cnt;
             // recycle corresponding resources
             auto [k, v] = it->next();
             if (!it->has_next()) {
                 begin = k;
                 VLOG_DEBUG << "iterator has no more kvs. key=" << hex(k);
             }
-            if (recycle_func(k, v) != 0) ret = -1;
+            // if we want to continue scanning, the recycle_func should not 
return non-zero
+            if (recycle_func(k, v) != 0) {
+                err = "recycle_func error";
+                ret = -1;
+            }
         }
         begin.push_back('\x00'); // Update to next smallest key for iteration
-        if (loop_done) {
-            if (loop_done() != 0) ret = -1;
+        // if we want to continue scanning, the recycle_func should not return 
non-zero
+        if (loop_done && loop_done() != 0) {
+            err = "loop_done error";
+            ret = -1;
         }
     } while (it->more() && !stopped());
     return ret;
@@ -2029,10 +2121,10 @@ int InstanceRecycler::scan_and_recycle(
 
 int InstanceRecycler::abort_timeout_txn() {
     const std::string task_name = "abort_timeout_txn";
-    int num_scanned = 0;
-    int num_timeout = 0;
-    int num_abort = 0;
-    int num_advance = 0;
+    int64_t num_scanned = 0;
+    int64_t num_timeout = 0;
+    int64_t num_abort = 0;
+    int64_t num_advance = 0;
 
     TxnRunningKeyInfo txn_running_key_info0 {instance_id_, 0, 0};
     TxnRunningKeyInfo txn_running_key_info1 {instance_id_, INT64_MAX, 
INT64_MAX};
@@ -2169,9 +2261,9 @@ int InstanceRecycler::abort_timeout_txn() {
 
 int InstanceRecycler::recycle_expired_txn_label() {
     const std::string task_name = "recycle_expired_txn_label";
-    int num_scanned = 0;
-    int num_expired = 0;
-    int num_recycled = 0;
+    int64_t num_scanned = 0;
+    int64_t num_expired = 0;
+    int64_t num_recycled = 0;
 
     RecycleTxnKeyInfo recycle_txn_key_info0 {instance_id_, 0, 0};
     RecycleTxnKeyInfo recycle_txn_key_info1 {instance_id_, INT64_MAX, 
INT64_MAX};
@@ -2195,11 +2287,23 @@ int InstanceRecycler::recycle_expired_txn_label() {
                 .tag("num_recycled", num_recycled);
     });
 
-    int64_t current_time =
+    int64_t earlest_ts = std::numeric_limits<int64_t>::max();
+    auto calc_expiration = [&earlest_ts, this](const RecycleTxnPB& 
recycle_txn_pb) {
+        int64_t final_expiration =
+                recycle_txn_pb.creation_time() + config::label_keep_max_second 
* 1000L;
+        if (earlest_ts > final_expiration / 1000) {
+            earlest_ts = final_expiration / 1000;
+            
g_bvar_recycler_recycle_expired_txn_label_earlest_ts.put(instance_id_, 
earlest_ts);
+        }
+        return final_expiration;
+    };
+
+    int64_t current_time_ms =
             
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
 
-    auto handle_recycle_txn_kv = [&num_scanned, &num_expired, &num_recycled, 
&current_time, this](
-                                         std::string_view k, std::string_view 
v) -> int {
+    auto handle_recycle_txn_kv = [&num_scanned, &num_expired, &num_recycled, 
&current_time_ms,
+                                  &calc_expiration,
+                                  this](std::string_view k, std::string_view 
v) -> int {
         ++num_scanned;
         RecycleTxnPB recycle_txn_pb;
         if (!recycle_txn_pb.ParseFromArray(v.data(), v.size())) {
@@ -2208,13 +2312,13 @@ int InstanceRecycler::recycle_expired_txn_label() {
         }
         if ((config::force_immediate_recycle) ||
             (recycle_txn_pb.has_immediate() && recycle_txn_pb.immediate()) ||
-            (recycle_txn_pb.creation_time() + config::label_keep_max_second * 
1000L <=
-             current_time)) {
-            LOG_INFO("found recycle txn").tag("key", hex(k));
+            (calc_expiration(recycle_txn_pb) <= current_time_ms)) {
+            VLOG_DEBUG << "found recycle txn, key=" << hex(k);
             num_expired++;
         } else {
             return 0;
         }
+
         std::string_view k1 = k;
         //RecycleTxnKeyInfo 0:instance_id  1:db_id  2:txn_id
         k1.remove_prefix(1); // Remove key space
@@ -2414,10 +2518,10 @@ private:
 };
 
 int InstanceRecycler::recycle_copy_jobs() {
-    int num_scanned = 0;
-    int num_finished = 0;
-    int num_expired = 0;
-    int num_recycled = 0;
+    int64_t num_scanned = 0;
+    int64_t num_finished = 0;
+    int64_t num_expired = 0;
+    int64_t num_recycled = 0;
     // Used for INTERNAL stage's copy jobs to tag each batch for log trace
     uint64_t batch_count = 0;
     const std::string task_name = "recycle_copy_jobs";
@@ -2659,8 +2763,8 @@ int InstanceRecycler::init_copy_job_accessor(const 
std::string& stage_id,
 }
 
 int InstanceRecycler::recycle_stage() {
-    int num_scanned = 0;
-    int num_recycled = 0;
+    int64_t num_scanned = 0;
+    int64_t num_recycled = 0;
     const std::string task_name = "recycle_stage";
 
     LOG_INFO("begin to recycle stage").tag("instance_id", instance_id_);
@@ -2680,12 +2784,9 @@ int InstanceRecycler::recycle_stage() {
 
     RecycleStageKeyInfo key_info0 {instance_id_, ""};
     RecycleStageKeyInfo key_info1 {instance_id_, "\xff"};
-    std::string key0;
-    std::string key1;
-    recycle_stage_key(key_info0, &key0);
-    recycle_stage_key(key_info1, &key1);
+    std::string key0 = recycle_stage_key(key_info0);
+    std::string key1 = recycle_stage_key(key_info1);
 
-    // Elements in `tmp_rowset_keys` has the same lifetime as `it`
     std::vector<std::string_view> stage_keys;
     auto recycle_func = [&start_time, &num_scanned, &num_recycled, 
&stage_keys, this](
                                 std::string_view k, std::string_view v) -> int 
{
@@ -2775,6 +2876,12 @@ int InstanceRecycler::recycle_expired_stage_objects() {
     });
     int ret = 0;
     for (const auto& stage : instance_info_.stages()) {
+        std::stringstream ss;
+        ss << "instance_id=" << instance_id_ << ", stage_id=" << 
stage.stage_id()
+           << ", user_name=" << stage.mysql_user_name().at(0)
+           << ", user_id=" << stage.mysql_user_id().at(0)
+           << ", prefix=" << stage.obj_info().prefix();
+
         if (stopped()) break;
         if (stage.type() == StagePB::EXTERNAL) {
             continue;
@@ -2788,7 +2895,7 @@ int InstanceRecycler::recycle_expired_stage_objects() {
         const auto& old_obj = instance_info_.obj_info()[idx - 1];
         auto s3_conf = S3Conf::from_obj_store_info(old_obj);
         if (!s3_conf) {
-            LOG(WARNING) << "failed to init accessor";
+            LOG(WARNING) << "failed to init s3_conf with obj_info=" << 
old_obj.DebugString();
             continue;
         }
 
@@ -2796,16 +2903,18 @@ int InstanceRecycler::recycle_expired_stage_objects() {
         std::shared_ptr<S3Accessor> accessor;
         int ret1 = S3Accessor::create(std::move(*s3_conf), &accessor);
         if (ret1 != 0) {
-            LOG(WARNING) << "failed to init s3 accessor ret=" << ret1;
+            LOG(WARNING) << "failed to init s3 accessor ret=" << ret1 << " " 
<< ss.str();
+            ret = -1;
+            continue;
+        }
+
+        if (s3_conf->prefix.find("/stage/") == std::string::npos) {
+            LOG(WARNING) << "try to delete illegal prefix, which is 
catastrophic, " << ss.str();
             ret = -1;
             continue;
         }
 
-        LOG(INFO) << "recycle expired stage objects, instance_id=" << 
instance_id_
-                  << ", stage_id=" << stage.stage_id()
-                  << ", user_name=" << stage.mysql_user_name().at(0)
-                  << ", user_id=" << stage.mysql_user_id().at(0)
-                  << ", prefix=" << stage.obj_info().prefix();
+        LOG(INFO) << "recycle expired stage objects, " << ss.str();
         int64_t expiration_time =
                 
duration_cast<seconds>(system_clock::now().time_since_epoch()).count() -
                 config::internal_stage_objects_expire_time_second;
@@ -2814,8 +2923,8 @@ int InstanceRecycler::recycle_expired_stage_objects() {
         }
         ret1 = accessor->delete_all(expiration_time);
         if (ret1 != 0) {
-            LOG(WARNING) << "failed to recycle expired stage objects, 
instance_id=" << instance_id_
-                         << ", stage_id=" << stage.stage_id() << ", ret=" << 
ret1;
+            LOG(WARNING) << "failed to recycle expired stage objects, ret=" << 
ret1 << " "
+                         << ss.str();
             ret = -1;
             continue;
         }
diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h
index 91a461f474f..cf23dcacd2f 100644
--- a/cloud/src/recycler/recycler.h
+++ b/cloud/src/recycler/recycler.h
@@ -55,7 +55,9 @@ struct RecyclerThreadPoolGroup {
     RecyclerThreadPoolGroup& operator=(RecyclerThreadPoolGroup& other) = 
default;
     RecyclerThreadPoolGroup& operator=(RecyclerThreadPoolGroup&& other) = 
default;
     RecyclerThreadPoolGroup(RecyclerThreadPoolGroup&&) = default;
+    // used for accessor.delete_files, accessor.delete_directory
     std::shared_ptr<SimpleThreadPool> s3_producer_pool;
+    // used for InstanceRecycler::recycle_tablet
     std::shared_ptr<SimpleThreadPool> recycle_tablet_pool;
     std::shared_ptr<SimpleThreadPool> group_recycle_function_pool;
 };
@@ -128,19 +130,26 @@ public:
     // returns 0 for success otherwise error
     int recycle_deleted_instance();
 
-    // scan and recycle expired indexes
+    // scan and recycle expired indexes:
+    // 1. dropped table, dropped mv
+    // 2. half-successtable/index when create
     // returns 0 for success otherwise error
     int recycle_indexes();
 
-    // scan and recycle expired partitions
+    // scan and recycle expired partitions:
+    // 1. dropped parttion
+    // 2. half-success partition when create
     // returns 0 for success otherwise error
     int recycle_partitions();
 
-    // scan and recycle expired rowsets
+    // scan and recycle expired rowsets:
+    // 1. prepare_rowset will produce recycle_rowset before uploading data to 
remote storage (memo)
+    // 2. compaction will change the input rowsets to recycle_rowset
     // returns 0 for success otherwise error
     int recycle_rowsets();
 
-    // scan and recycle expired tmp rowsets
+    // scan and recycle expired tmp rowsets:
+    // 1. commit_rowset will produce tmp_rowset when finish upload data (load 
or compaction) to remote storage
     // returns 0 for success otherwise error
     int recycle_tmp_rowsets();
 
@@ -203,12 +212,15 @@ private:
     int scan_and_recycle(std::string begin, std::string_view end,
                          std::function<int(std::string_view k, 
std::string_view v)> recycle_func,
                          std::function<int()> loop_done = nullptr);
+
     // return 0 for success otherwise error
     int delete_rowset_data(const doris::RowsetMetaCloudPB& rs_meta_pb);
+
     // return 0 for success otherwise error
     // NOTE: this function ONLY be called when the file paths cannot be 
calculated
     int delete_rowset_data(const std::string& resource_id, int64_t tablet_id,
                            const std::string& rowset_id);
+
     // return 0 for success otherwise error
     int delete_rowset_data(const std::vector<doris::RowsetMetaCloudPB>& 
rowsets);
 
diff --git a/cloud/src/recycler/s3_accessor.cpp 
b/cloud/src/recycler/s3_accessor.cpp
index 1aca88d2d11..224b36c277c 100644
--- a/cloud/src/recycler/s3_accessor.cpp
+++ b/cloud/src/recycler/s3_accessor.cpp
@@ -282,6 +282,11 @@ int S3Accessor::init() {
         Aws::Client::ClientConfiguration aws_config;
         aws_config.endpointOverride = conf_.endpoint;
         aws_config.region = conf_.region;
+        // Aws::Http::CurlHandleContainer::AcquireCurlHandle() may be blocked 
if the connecitons are bottleneck
+        aws_config.maxConnections = 
std::max((long)(config::recycle_pool_parallelism +
+                                                    
config::instance_recycler_worker_pool_size),
+                                             (long)aws_config.maxConnections);
+
         if (config::s3_client_http_scheme == "http") {
             aws_config.scheme = Aws::Http::Scheme::HTTP;
         }
@@ -349,7 +354,12 @@ int S3Accessor::delete_files(const 
std::vector<std::string>& paths) {
 
 int S3Accessor::delete_file(const std::string& path) {
     LOG_INFO("delete file").tag("uri", to_uri(path));
-    return obj_client_->delete_object({.bucket = conf_.bucket, .key = 
get_key(path)}).ret;
+    int ret = obj_client_->delete_object({.bucket = conf_.bucket, .key = 
get_key(path)}).ret;
+    static_assert(ObjectStorageResponse::OK == 0);
+    if (ret == ObjectStorageResponse::OK || ret == 
ObjectStorageResponse::NOT_FOUND) {
+        return 0;
+    }
+    return ret;
 }
 
 int S3Accessor::put_file(const std::string& path, const std::string& content) {
@@ -392,21 +402,45 @@ int S3Accessor::check_versioning() {
 }
 
 int GcsAccessor::delete_prefix_impl(const std::string& path_prefix, int64_t 
expiration_time) {
-    LOG_INFO("delete prefix").tag("uri", to_uri(path_prefix));
+    LOG_INFO("begin delete prefix").tag("uri", to_uri(path_prefix));
 
     int ret = 0;
+    int cnt = 0;
+    int skip = 0;
+    int64_t del_nonexisted = 0;
+    int del = 0;
     auto iter = obj_client_->list_objects({conf_.bucket, 
get_key(path_prefix)});
     for (auto obj = iter->next(); obj.has_value(); obj = iter->next()) {
+        if (!(++cnt % 100)) {
+            LOG_INFO("loop delete prefix")
+                    .tag("uri", to_uri(path_prefix))
+                    .tag("total_obj_cnt", cnt)
+                    .tag("deleted", del)
+                    .tag("del_nonexisted", del_nonexisted)
+                    .tag("skipped", skip);
+        }
         if (expiration_time > 0 && obj->mtime_s > expiration_time) {
+            skip++;
             continue;
         }
+        del++;
 
-        // FIXME(plat1ko): Delete objects by batch
-        if (int del_ret = obj_client_->delete_object({conf_.bucket, 
obj->key}).ret; del_ret != 0) {
+        // FIXME(plat1ko): Delete objects by batch with genuine GCS client
+        int del_ret = obj_client_->delete_object({conf_.bucket, obj->key}).ret;
+        del_nonexisted += (del_ret == ObjectStorageResponse::NOT_FOUND);
+        static_assert(ObjectStorageResponse::OK == 0);
+        if (del_ret != ObjectStorageResponse::OK && del_ret != 
ObjectStorageResponse::NOT_FOUND) {
             ret = del_ret;
         }
     }
 
+    LOG_INFO("finish delete prefix")
+            .tag("uri", to_uri(path_prefix))
+            .tag("total_obj_cnt", cnt)
+            .tag("deleted", del)
+            .tag("del_nonexisted", del_nonexisted)
+            .tag("skipped", skip);
+
     if (!iter->is_valid()) {
         return -1;
     }
diff --git a/cloud/src/recycler/s3_obj_client.cpp 
b/cloud/src/recycler/s3_obj_client.cpp
index c8dcdad18d7..0e548819d25 100644
--- a/cloud/src/recycler/s3_obj_client.cpp
+++ b/cloud/src/recycler/s3_obj_client.cpp
@@ -293,9 +293,12 @@ ObjectStorageResponse 
S3ObjClient::delete_object(ObjectStoragePathRef path) {
                 .tag("responseCode", 
static_cast<int>(outcome.GetError().GetResponseCode()))
                 .tag("error", outcome.GetError().GetMessage())
                 .tag("exception", outcome.GetError().GetExceptionName());
-        return -1;
+        if (outcome.GetError().GetResponseCode() == 
Aws::Http::HttpResponseCode::NOT_FOUND) {
+            return {ObjectStorageResponse::NOT_FOUND, 
outcome.GetError().GetMessage()};
+        }
+        return {ObjectStorageResponse::UNDEFINED, 
outcome.GetError().GetMessage()};
     }
-    return 0;
+    return {ObjectStorageResponse::OK};
 }
 
 ObjectStorageResponse 
S3ObjClient::delete_objects_recursively(ObjectStoragePathRef path,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to