This is an automated email from the ASF dual-hosted git repository.

jacktengg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new b6b785f99bd improve spill counters (#41724)
b6b785f99bd is described below

commit b6b785f99bde491e1a74e7dfcbc40854e581aa8c
Author: TengJianPing <18241664+jackte...@users.noreply.github.com>
AuthorDate: Mon Oct 14 11:28:14 2024 +0800

    improve spill counters (#41724)
---
 be/src/pipeline/dependency.cpp                     |  22 +++
 be/src/pipeline/dependency.h                       |  35 ++++
 be/src/pipeline/exec/aggregation_sink_operator.cpp |   1 -
 .../distinct_streaming_aggregation_operator.cpp    |   1 -
 be/src/pipeline/exec/operator.h                    | 214 +++++++++++++++++----
 .../exec/partitioned_aggregation_sink_operator.cpp |  14 +-
 .../exec/partitioned_aggregation_sink_operator.h   |  11 +-
 .../partitioned_aggregation_source_operator.cpp    |  29 +--
 .../exec/partitioned_hash_join_probe_operator.cpp  |  93 +++++----
 .../exec/partitioned_hash_join_probe_operator.h    |   5 -
 .../exec/partitioned_hash_join_sink_operator.cpp   | 125 ++++++++----
 .../exec/partitioned_hash_join_sink_operator.h     |   1 +
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  |  45 +++--
 be/src/pipeline/exec/spill_sort_sink_operator.h    |   1 +
 .../pipeline/exec/spill_sort_source_operator.cpp   |  42 ++--
 be/src/pipeline/exec/spill_sort_source_operator.h  |   4 -
 be/src/pipeline/exec/spill_utils.h                 |  35 +++-
 .../exec/streaming_aggregation_operator.cpp        |   1 -
 be/src/vec/core/block.h                            |  12 +-
 be/src/vec/spill/spill_reader.cpp                  |  16 +-
 be/src/vec/spill/spill_reader.h                    |  24 ++-
 be/src/vec/spill/spill_stream.cpp                  |  40 ++--
 be/src/vec/spill/spill_stream.h                    |  32 +--
 be/src/vec/spill/spill_stream_manager.cpp          |   1 +
 be/src/vec/spill/spill_writer.cpp                  |  42 ++--
 be/src/vec/spill/spill_writer.h                    |  44 +++--
 26 files changed, 603 insertions(+), 287 deletions(-)

diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index 315e76a2426..cdde0c6330c 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -327,6 +327,19 @@ void PartitionedAggSharedState::init_spill_params(size_t 
spill_partition_count_b
     }
 }
 
+void PartitionedAggSharedState::update_spill_stream_profiles(RuntimeProfile* 
source_profile) {
+    for (auto& partition : spill_partitions) {
+        if (partition->spilling_stream_) {
+            
partition->spilling_stream_->update_shared_profiles(source_profile);
+        }
+        for (auto& stream : partition->spill_streams_) {
+            if (stream) {
+                stream->update_shared_profiles(source_profile);
+            }
+        }
+    }
+}
+
 Status AggSpillPartition::get_spill_stream(RuntimeState* state, int node_id,
                                            RuntimeProfile* profile,
                                            vectorized::SpillStreamSPtr& 
spill_stream) {
@@ -339,6 +352,7 @@ Status AggSpillPartition::get_spill_stream(RuntimeState* 
state, int node_id,
             std::numeric_limits<int32_t>::max(), 
std::numeric_limits<size_t>::max(), profile));
     spill_streams_.emplace_back(spilling_stream_);
     spill_stream = spilling_stream_;
+    spill_stream->set_write_counters(profile);
     return Status::OK();
 }
 void AggSpillPartition::close() {
@@ -365,6 +379,14 @@ void PartitionedAggSharedState::close() {
     spill_partitions.clear();
 }
 
+void SpillSortSharedState::update_spill_stream_profiles(RuntimeProfile* 
source_profile) {
+    for (auto& stream : sorted_streams) {
+        if (stream) {
+            stream->update_shared_profiles(source_profile);
+        }
+    }
+}
+
 void SpillSortSharedState::close() {
     // need to use CAS instead of only `if (!is_closed)` statement,
     // to avoid concurrent entry of close() both pass the if statement
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 3cf1cbff9fa..20251aed294 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -426,14 +426,36 @@ private:
     Status _destroy_agg_status(vectorized::AggregateDataPtr data);
 };
 
+struct BasicSpillSharedState {
+    virtual ~BasicSpillSharedState() = default;
+
+    // These two counters are shared to spill source operators as the initial 
value
+    // of 'SpillWriteFileCurrentSize' and 'SpillWriteFileCurrentCount'.
+    // Total bytes of spill data written to disk file(after serialized)
+    RuntimeProfile::Counter* _spill_write_file_data_size = nullptr;
+    RuntimeProfile::Counter* _spill_file_total_count = nullptr;
+
+    void setup_shared_profile(RuntimeProfile* sink_profile) {
+        _spill_file_total_count =
+                ADD_COUNTER_WITH_LEVEL(sink_profile, 
"SpillWriteFileTotalCount", TUnit::UNIT, 1);
+        _spill_write_file_data_size =
+                ADD_COUNTER_WITH_LEVEL(sink_profile, 
"SpillWriteFileTotalSize", TUnit::BYTES, 1);
+    }
+
+    virtual void update_spill_stream_profiles(RuntimeProfile* source_profile) 
= 0;
+};
+
 struct AggSpillPartition;
 struct PartitionedAggSharedState : public BasicSharedState,
+                                   public BasicSpillSharedState,
                                    public 
std::enable_shared_from_this<PartitionedAggSharedState> {
     ENABLE_FACTORY_CREATOR(PartitionedAggSharedState)
 
     PartitionedAggSharedState() = default;
     ~PartitionedAggSharedState() override = default;
 
+    void update_spill_stream_profiles(RuntimeProfile* source_profile) override;
+
     void init_spill_params(size_t spill_partition_count_bits);
 
     void close();
@@ -498,6 +520,7 @@ public:
 };
 
 struct SpillSortSharedState : public BasicSharedState,
+                              public BasicSpillSharedState,
                               public 
std::enable_shared_from_this<SpillSortSharedState> {
     ENABLE_FACTORY_CREATOR(SpillSortSharedState)
 
@@ -515,6 +538,9 @@ struct SpillSortSharedState : public BasicSharedState,
             LOG(INFO) << "spill sort block batch row count: " << 
spill_block_batch_row_count;
         }
     }
+
+    void update_spill_stream_profiles(RuntimeProfile* source_profile) override;
+
     void close();
 
     SortSharedState* in_mem_shared_state = nullptr;
@@ -622,9 +648,18 @@ struct HashJoinSharedState : public JoinSharedState {
 
 struct PartitionedHashJoinSharedState
         : public HashJoinSharedState,
+          public BasicSpillSharedState,
           public std::enable_shared_from_this<PartitionedHashJoinSharedState> {
     ENABLE_FACTORY_CREATOR(PartitionedHashJoinSharedState)
 
+    void update_spill_stream_profiles(RuntimeProfile* source_profile) override 
{
+        for (auto& stream : spilled_streams) {
+            if (stream) {
+                stream->update_shared_profiles(source_profile);
+            }
+        }
+    }
+
     std::unique_ptr<RuntimeState> inner_runtime_state;
     std::shared_ptr<HashJoinSharedState> inner_shared_state;
     std::vector<std::unique_ptr<vectorized::MutableBlock>> 
partitioned_build_blocks;
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 3dd43c3c4d8..79d1460cd06 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -66,7 +66,6 @@ Status AggSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
 
     _build_timer = ADD_TIMER(Base::profile(), "BuildTime");
     _serialize_key_timer = ADD_TIMER(Base::profile(), "SerializeKeyTime");
-    _exec_timer = ADD_TIMER(Base::profile(), "ExecTime");
     _merge_timer = ADD_TIMER(Base::profile(), "MergeTime");
     _expr_timer = ADD_TIMER(Base::profile(), "ExprTime");
     _serialize_data_timer = ADD_TIMER(Base::profile(), "SerializeDataTime");
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index 79347064048..b0e635bef84 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -72,7 +72,6 @@ Status DistinctStreamingAggLocalState::init(RuntimeState* 
state, LocalStateInfo&
     SCOPED_TIMER(Base::exec_time_counter());
     SCOPED_TIMER(Base::_init_timer);
     _build_timer = ADD_TIMER(Base::profile(), "BuildTime");
-    _exec_timer = ADD_TIMER(Base::profile(), "ExecTime");
     _hash_table_compute_timer = ADD_TIMER(Base::profile(), 
"HashTableComputeTime");
     _hash_table_emplace_timer = ADD_TIMER(Base::profile(), 
"HashTableEmplaceTime");
     _hash_table_input_counter = ADD_COUNTER(Base::profile(), 
"HashTableInputCount", TUnit::UNIT);
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 649bc70c238..da0cc008f06 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -287,28 +287,137 @@ public:
 
     Status init(RuntimeState* state, LocalStateInfo& info) override {
         RETURN_IF_ERROR(PipelineXLocalState<SharedStateArg>::init(state, 
info));
-        _spill_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillTime", 1);
-        _spill_recover_time = ADD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillRecoverTime", 1);
-        _spill_read_data_time = ADD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillReadDataTime", 1);
-        _spill_deserialize_time = ADD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillDeserializeTime", 1);
-        _spill_read_bytes =
-                ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadDataSize", 
TUnit::BYTES, 1);
-        _spill_wait_in_queue_timer =
-                ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWaitInQueueTime", 
1);
-        _spill_write_wait_io_timer =
-                ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteWaitIOTime", 
1);
-        _spill_read_wait_io_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillReadWaitIOTime", 1);
+
+        _spill_total_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillTotalTime", 1);
+        init_spill_read_counters();
+
         return Status::OK();
     }
 
-    RuntimeProfile::Counter* _spill_timer = nullptr;
-    RuntimeProfile::Counter* _spill_recover_time;
-    RuntimeProfile::Counter* _spill_read_data_time;
-    RuntimeProfile::Counter* _spill_deserialize_time;
-    RuntimeProfile::Counter* _spill_read_bytes;
-    RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr;
-    RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr;
-    RuntimeProfile::Counter* _spill_wait_in_queue_timer = nullptr;
+    void init_spill_write_counters() {
+        _spill_write_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillWriteTime", 1);
+
+        _spill_write_wait_in_queue_task_count = ADD_COUNTER_WITH_LEVEL(
+                Base::profile(), "SpillWriteTaskWaitInQueueCount", 
TUnit::UNIT, 1);
+        _spill_writing_task_count =
+                ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteTaskCount", 
TUnit::UNIT, 1);
+        _spill_write_wait_in_queue_timer =
+                ADD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillWriteTaskWaitInQueueTime", 1);
+
+        _spill_write_file_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillWriteFileTime", 1);
+
+        _spill_write_serialize_block_timer =
+                ADD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillWriteSerializeBlockTime", 1);
+        _spill_write_block_count =
+                ADD_COUNTER_WITH_LEVEL(Base::profile(), 
"SpillWriteBlockCount", TUnit::UNIT, 1);
+        _spill_write_block_data_size =
+                ADD_COUNTER_WITH_LEVEL(Base::profile(), 
"SpillWriteBlockDataSize", TUnit::BYTES, 1);
+        _spill_write_file_data_size =
+                ADD_COUNTER_WITH_LEVEL(Base::profile(), 
"SpillWriteFileTotalSize", TUnit::BYTES, 1);
+        _spill_write_rows_count =
+                ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteRows", 
TUnit::UNIT, 1);
+        _spill_file_total_count =
+                ADD_COUNTER_WITH_LEVEL(Base::profile(), 
"SpillWriteFileTotalCount", TUnit::UNIT, 1);
+    }
+
+    void init_spill_read_counters() {
+        // Spill read counters
+        _spill_recover_time = ADD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillRecoverTime", 1);
+
+        _spill_read_wait_in_queue_task_count = ADD_COUNTER_WITH_LEVEL(
+                Base::profile(), "SpillReadTaskWaitInQueueCount", TUnit::UNIT, 
1);
+        _spill_reading_task_count =
+                ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadTaskCount", 
TUnit::UNIT, 1);
+        _spill_read_wait_in_queue_timer =
+                ADD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillReadTaskWaitInQueueTime", 1);
+
+        _spill_read_file_time = ADD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillReadFileTime", 1);
+        _spill_read_derialize_block_timer =
+                ADD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillReadDerializeBlockTime", 1);
+
+        _spill_read_block_count =
+                ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadBlockCount", 
TUnit::UNIT, 1);
+        _spill_read_block_data_size =
+                ADD_COUNTER_WITH_LEVEL(Base::profile(), 
"SpillReadBlockDataSize", TUnit::BYTES, 1);
+        _spill_read_file_size =
+                ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadFileSize", 
TUnit::BYTES, 1);
+        _spill_read_rows_count =
+                ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadRows", 
TUnit::UNIT, 1);
+        _spill_read_file_count =
+                ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadFileCount", 
TUnit::UNIT, 1);
+
+        _spill_file_current_size = ADD_COUNTER_WITH_LEVEL(
+                Base::profile(), "SpillWriteFileCurrentSize", TUnit::BYTES, 1);
+        _spill_file_current_count = ADD_COUNTER_WITH_LEVEL(
+                Base::profile(), "SpillWriteFileCurrentCount", TUnit::UNIT, 1);
+    }
+
+    // These two counters are shared to spill source operators as the initial 
value
+    // Initialize values of counters 'SpillWriteFileCurrentSize' and 
'SpillWriteFileCurrentCount'
+    // from spill sink operators' "SpillWriteFileTotalCount" and 
"SpillWriteFileTotalSize"
+    void copy_shared_spill_profile() {
+        if (_copy_shared_spill_profile) {
+            _copy_shared_spill_profile = false;
+            const auto* spill_shared_state = (const 
BasicSpillSharedState*)Base::_shared_state;
+            COUNTER_SET(_spill_file_current_size,
+                        
spill_shared_state->_spill_write_file_data_size->value());
+            COUNTER_SET(_spill_file_current_count,
+                        spill_shared_state->_spill_file_total_count->value());
+            Base::_shared_state->update_spill_stream_profiles(Base::profile());
+        }
+    }
+
+    // Total time of spill, including spill task scheduling time,
+    // serialize block time, write disk file time,
+    // and read disk file time, deserialize block time etc.
+    RuntimeProfile::Counter* _spill_total_timer = nullptr;
+
+    // Spill write counters
+    // Total time of spill write, including serialize block time, write disk 
file,
+    // and wait in queue time, etc.
+    RuntimeProfile::Counter* _spill_write_timer = nullptr;
+
+    RuntimeProfile::Counter* _spill_write_wait_in_queue_task_count = nullptr;
+    RuntimeProfile::Counter* _spill_writing_task_count = nullptr;
+    RuntimeProfile::Counter* _spill_write_wait_in_queue_timer = nullptr;
+
+    // Total time of writing file
+    RuntimeProfile::Counter* _spill_write_file_timer = nullptr;
+    RuntimeProfile::Counter* _spill_write_serialize_block_timer = nullptr;
+    // Original count of spilled Blocks
+    // One Big Block maybe split into multiple small Blocks when actually 
written to disk file.
+    RuntimeProfile::Counter* _spill_write_block_count = nullptr;
+    // Total bytes of spill data in Block format(in memory format)
+    RuntimeProfile::Counter* _spill_write_block_data_size = nullptr;
+    // Total bytes of spill data written to disk file(after serialized)
+    RuntimeProfile::Counter* _spill_write_file_data_size = nullptr;
+    RuntimeProfile::Counter* _spill_write_rows_count = nullptr;
+    RuntimeProfile::Counter* _spill_file_total_count = nullptr;
+    RuntimeProfile::Counter* _spill_file_current_count = nullptr;
+    // Spilled file total size
+    RuntimeProfile::Counter* _spill_file_total_size = nullptr;
+    // Current spilled file size
+    RuntimeProfile::Counter* _spill_file_current_size = nullptr;
+
+    // Spill read counters
+    // Total time of recovring spilled data, including read file time, 
deserialize time, etc.
+    RuntimeProfile::Counter* _spill_recover_time = nullptr;
+
+    RuntimeProfile::Counter* _spill_read_wait_in_queue_task_count = nullptr;
+    RuntimeProfile::Counter* _spill_reading_task_count = nullptr;
+    RuntimeProfile::Counter* _spill_read_wait_in_queue_timer = nullptr;
+
+    RuntimeProfile::Counter* _spill_read_file_time = nullptr;
+    RuntimeProfile::Counter* _spill_read_derialize_block_timer = nullptr;
+    RuntimeProfile::Counter* _spill_read_block_count = nullptr;
+    // Total bytes of read data in Block format(in memory format)
+    RuntimeProfile::Counter* _spill_read_block_data_size = nullptr;
+    // Total bytes of spill data read from disk file
+    RuntimeProfile::Counter* _spill_read_file_size = nullptr;
+    RuntimeProfile::Counter* _spill_read_rows_count = nullptr;
+    RuntimeProfile::Counter* _spill_read_file_count = nullptr;
+
+    bool _copy_shared_spill_profile = true;
 };
 
 class DataSinkOperatorXBase;
@@ -589,19 +698,28 @@ public:
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override {
         RETURN_IF_ERROR(Base::init(state, info));
 
-        _spill_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillTime", 1);
-        _spill_serialize_block_timer =
-                ADD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillSerializeBlockTime", 1);
-        _spill_write_disk_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillWriteDiskTime", 1);
-        _spill_data_size =
-                ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteDataSize", 
TUnit::BYTES, 1);
-        _spill_block_count =
+        _spill_total_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillTotalTime", 1);
+
+        _spill_write_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillWriteTime", 1);
+
+        _spill_write_wait_in_queue_task_count = ADD_COUNTER_WITH_LEVEL(
+                Base::profile(), "SpillWriteTaskWaitInQueueCount", 
TUnit::UNIT, 1);
+        _spill_writing_task_count =
+                ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteTaskCount", 
TUnit::UNIT, 1);
+        _spill_write_wait_in_queue_timer =
+                ADD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillWriteTaskWaitInQueueTime", 1);
+
+        _spill_write_file_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillWriteFileTime", 1);
+
+        _spill_write_serialize_block_timer =
+                ADD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillWriteSerializeBlockTime", 1);
+        _spill_write_block_count =
                 ADD_COUNTER_WITH_LEVEL(Base::profile(), 
"SpillWriteBlockCount", TUnit::UNIT, 1);
-        _spill_wait_in_queue_timer =
-                ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWaitInQueueTime", 
1);
-        _spill_write_wait_io_timer =
-                ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteWaitIOTime", 
1);
-        _spill_read_wait_io_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillReadWaitIOTime", 1);
+        _spill_write_block_data_size =
+                ADD_COUNTER_WITH_LEVEL(Base::profile(), 
"SpillWriteBlockDataSize", TUnit::BYTES, 1);
+        _spill_write_rows_count =
+                ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteRows", 
TUnit::UNIT, 1);
+
         _spill_max_rows_of_partition =
                 ADD_COUNTER_WITH_LEVEL(Base::profile(), 
"SpillMaxRowsOfPartition", TUnit::UNIT, 1);
         _spill_min_rows_of_partition =
@@ -633,14 +751,32 @@ public:
 
     std::vector<int64_t> _rows_in_partitions;
 
-    RuntimeProfile::Counter* _spill_timer = nullptr;
-    RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr;
-    RuntimeProfile::Counter* _spill_write_disk_timer = nullptr;
-    RuntimeProfile::Counter* _spill_data_size = nullptr;
-    RuntimeProfile::Counter* _spill_block_count = nullptr;
-    RuntimeProfile::Counter* _spill_wait_in_queue_timer = nullptr;
-    RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr;
-    RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr;
+    // Total time of spill, including spill task scheduling time,
+    // serialize block time, write disk file time,
+    // and read disk file time, deserialize block time etc.
+    RuntimeProfile::Counter* _spill_total_timer = nullptr;
+
+    // Spill write counters
+    // Total time of spill write, including serialize block time, write disk 
file,
+    // and wait in queue time, etc.
+    RuntimeProfile::Counter* _spill_write_timer = nullptr;
+
+    RuntimeProfile::Counter* _spill_write_wait_in_queue_task_count = nullptr;
+    RuntimeProfile::Counter* _spill_writing_task_count = nullptr;
+    RuntimeProfile::Counter* _spill_write_wait_in_queue_timer = nullptr;
+
+    // Total time of writing file
+    RuntimeProfile::Counter* _spill_write_file_timer = nullptr;
+    RuntimeProfile::Counter* _spill_write_serialize_block_timer = nullptr;
+    // Original count of spilled Blocks
+    // One Big Block maybe split into multiple small Blocks when actually 
written to disk file.
+    RuntimeProfile::Counter* _spill_write_block_count = nullptr;
+    // Total bytes of spill data in Block format(in memory format)
+    RuntimeProfile::Counter* _spill_write_block_data_size = nullptr;
+    RuntimeProfile::Counter* _spill_write_rows_count = nullptr;
+    // Spilled file total size
+    RuntimeProfile::Counter* _spill_file_total_size = nullptr;
+
     RuntimeProfile::Counter* _spill_max_rows_of_partition = nullptr;
     RuntimeProfile::Counter* _spill_min_rows_of_partition = nullptr;
 };
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 1a86fdb2a9d..f5c09459f85 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -76,6 +76,7 @@ Status 
PartitionedAggSinkLocalState::init(doris::RuntimeState* state,
 Status PartitionedAggSinkLocalState::open(RuntimeState* state) {
     SCOPED_TIMER(Base::exec_time_counter());
     SCOPED_TIMER(Base::_open_timer);
+    _shared_state->setup_shared_profile(_profile);
     return Base::open(state);
 }
 
@@ -301,8 +302,17 @@ Status PartitionedAggSinkLocalState::revoke_memory(
 
     state->get_query_ctx()->increase_revoking_tasks_count();
     auto spill_runnable = std::make_shared<SpillRunnable>(
-            state, _shared_state->shared_from_this(),
+            state, _profile, true, _shared_state->shared_from_this(),
             [this, &parent, state, query_id, size_to_revoke, spill_context, 
submit_timer] {
+                auto submit_elapsed_time = submit_timer.elapsed_time();
+                _spill_write_wait_in_queue_timer->update(submit_elapsed_time);
+                exec_time_counter()->update(submit_elapsed_time);
+                _spill_total_timer->update(submit_elapsed_time);
+
+                SCOPED_TIMER(exec_time_counter());
+                SCOPED_TIMER(_spill_total_timer);
+                SCOPED_TIMER(_spill_write_timer);
+
                 
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_cancel", {
                     auto st = Status::InternalError(
                             "fault_inject partitioned_agg_sink "
@@ -310,8 +320,6 @@ Status PartitionedAggSinkLocalState::revoke_memory(
                     
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, st);
                     return st;
                 });
-                
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
-                SCOPED_TIMER(Base::_spill_timer);
                 Defer defer {[&]() {
                     if (!_shared_state->sink_status.ok() || 
state->is_cancelled()) {
                         if (!_shared_state->sink_status.ok()) {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 9b70da54943..fa32e032303 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -150,10 +150,6 @@ public:
         auto status = spill_partition->get_spill_stream(state, 
Base::_parent->node_id(),
                                                         Base::profile(), 
spill_stream);
         RETURN_IF_ERROR(status);
-        spill_stream->set_write_counters(Base::_spill_serialize_block_timer,
-                                         Base::_spill_block_count, 
Base::_spill_data_size,
-                                         Base::_spill_write_disk_timer,
-                                         Base::_spill_write_wait_io_timer, 
memory_used_counter());
 
         status = to_block(context, keys, values, null_key_data);
         RETURN_IF_ERROR(status);
@@ -168,14 +164,9 @@ public:
             keys.clear();
             values.clear();
         }
-        status = spill_stream->prepare_spill();
+        status = spill_stream->spill_block(state, block_, false);
         RETURN_IF_ERROR(status);
 
-        {
-            SCOPED_TIMER(_spill_write_disk_timer);
-            status = spill_stream->spill_block(state, block_, false);
-        }
-        RETURN_IF_ERROR(status);
         status = spill_partition->flush_if_full();
         _reset_tmp_data();
         return status;
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index c6a6c09f01b..1b49c0d3768 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -137,6 +137,7 @@ Status PartitionedAggSourceOperatorX::close(RuntimeState* 
state) {
 Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block* block,
                                                 bool* eos) {
     auto& local_state = get_local_state(state);
+    local_state.copy_shared_spill_profile();
     Defer defer {[&]() {
         if (!local_state._status.ok() || *eos) {
             local_state._shared_state->close();
@@ -226,8 +227,6 @@ Status 
PartitionedAggLocalState::setup_in_memory_agg_op(RuntimeState* state) {
 
 Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, 
bool& has_data) {
     const auto query_id = state->query_id();
-    MonotonicStopWatch submit_timer;
-    submit_timer.start();
 
     if (_shared_state->spill_partitions.empty()) {
         _shared_state->close();
@@ -236,10 +235,7 @@ Status 
PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
     }
 
     has_data = true;
-    auto spill_func = [this, state, query_id, submit_timer] {
-        _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
-        MonotonicStopWatch execution_timer;
-        execution_timer.start();
+    auto spill_func = [this, state, query_id] {
         Defer defer {[&]() {
             if (!_status.ok() || state->is_cancelled()) {
                 if (!_status.ok()) {
@@ -255,14 +251,11 @@ Status 
PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
         while (!state->is_cancelled() && !has_agg_data &&
                !_shared_state->spill_partitions.empty()) {
             for (auto& stream : 
_shared_state->spill_partitions[0]->spill_streams_) {
-                stream->set_read_counters(Base::_spill_read_data_time,
-                                          Base::_spill_deserialize_time, 
Base::_spill_read_bytes,
-                                          Base::_spill_read_wait_io_timer);
+                stream->set_read_counters(profile());
                 vectorized::Block block;
                 bool eos = false;
                 while (!eos && !state->is_cancelled()) {
                     {
-                        SCOPED_TIMER(Base::_spill_recover_time);
                         
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::recover_spill_data",
                                         {
                                             _status = 
Status::Error<INTERNAL_ERROR>(
@@ -298,7 +291,18 @@ Status 
PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
         return _status;
     };
 
-    auto exception_catch_func = [spill_func, query_id, this]() {
+    MonotonicStopWatch submit_timer;
+    submit_timer.start();
+    auto exception_catch_func = [spill_func, query_id, submit_timer, this]() {
+        auto submit_elapsed_time = submit_timer.elapsed_time();
+        _spill_read_wait_in_queue_timer->update(submit_elapsed_time);
+        exec_time_counter()->update(submit_elapsed_time);
+        _spill_total_timer->update(submit_elapsed_time);
+
+        SCOPED_TIMER(exec_time_counter());
+        SCOPED_TIMER(_spill_total_timer);
+        SCOPED_TIMER(_spill_recover_time);
+
         
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::merge_spill_data_cancel",
 {
             auto st = Status::InternalError(
                     "fault_inject partitioned_agg_source "
@@ -320,7 +324,8 @@ Status 
PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
     });
     _spill_dependency->block();
     return 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
-            std::make_shared<SpillRunnable>(state, 
_shared_state->shared_from_this(),
+            std::make_shared<SpillRunnable>(state, _runtime_profile.get(), 
false,
+                                            _shared_state->shared_from_this(),
                                             exception_catch_func));
 }
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 7b60c9a3e2f..6df2a7ca4ab 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -42,6 +42,8 @@ 
PartitionedHashJoinProbeLocalState::PartitionedHashJoinProbeLocalState(RuntimeSt
 
 Status PartitionedHashJoinProbeLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     RETURN_IF_ERROR(PipelineXSpillLocalState::init(state, info));
+    init_spill_write_counters();
+
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_init_timer);
     _internal_runtime_profile.reset(new RuntimeProfile("internal_profile"));
@@ -75,14 +77,6 @@ Status 
PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI
 
     _probe_blocks_bytes = ADD_COUNTER_WITH_LEVEL(profile(), 
"ProbeBlocksBytes", TUnit::BYTES, 1);
 
-    _spill_serialize_block_timer =
-            ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillSerializeBlockTime", 
1);
-    _spill_write_disk_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillWriteDiskTime", 1);
-    _spill_data_size =
-            ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteDataSize", 
TUnit::BYTES, 1);
-    _spill_block_count =
-            ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockCount", 
TUnit::UNIT, 1);
-
     // Build phase
     _build_phase_label = ADD_LABEL_COUNTER(profile(), "BuildPhase");
     _build_rows_counter = ADD_CHILD_COUNTER(profile(), "BuildRows", 
TUnit::UNIT, "BuildPhase");
@@ -182,11 +176,7 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(
     auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
     auto query_id = state->query_id();
 
-    MonotonicStopWatch submit_timer;
-    submit_timer.start();
-
-    auto spill_func = [query_id, state, submit_timer, this] {
-        _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+    auto spill_func = [query_id, state, this] {
         SCOPED_TIMER(_spill_probe_timer);
 
         size_t not_revoked_size = 0;
@@ -215,10 +205,7 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(
                         state, spilling_stream, print_id(state->query_id()), 
"hash_probe",
                         _parent->node_id(), 
std::numeric_limits<int32_t>::max(),
                         std::numeric_limits<size_t>::max(), 
_runtime_profile.get()));
-                RETURN_IF_ERROR(spilling_stream->prepare_spill());
-                spilling_stream->set_write_counters(
-                        _spill_serialize_block_timer, _spill_block_count, 
_spill_data_size,
-                        _spill_write_disk_timer, _spill_write_wait_io_timer, 
memory_used_counter());
+                spilling_stream->set_write_counters(_runtime_profile.get());
             }
 
             auto merged_block = 
vectorized::MutableBlock::create_unique(blocks[0].clone_empty());
@@ -256,8 +243,19 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(
         return Status::OK();
     };
 
-    auto exception_catch_func = [query_id, spill_func, spill_context, this]() {
-        SCOPED_TIMER(_spill_timer);
+    MonotonicStopWatch submit_timer;
+    submit_timer.start();
+
+    auto exception_catch_func = [query_id, spill_func, spill_context, 
submit_timer, this]() {
+        auto submit_elapsed_time = submit_timer.elapsed_time();
+        _spill_write_wait_in_queue_timer->update(submit_elapsed_time);
+        exec_time_counter()->update(submit_elapsed_time);
+        _spill_total_timer->update(submit_elapsed_time);
+
+        SCOPED_TIMER(exec_time_counter());
+        SCOPED_TIMER(_spill_total_timer);
+        SCOPED_TIMER(_spill_write_timer);
+
         
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_cancel",
 {
             ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
                     query_id, Status::InternalError("fault_inject 
partitioned_hash_join_probe "
@@ -286,7 +284,8 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(
                 "fault_inject partitioned_hash_join_probe spill_probe_blocks 
submit_func failed");
     });
 
-    auto spill_runnable = std::make_shared<SpillRunnable>(state, 
_shared_state->shared_from_this(),
+    auto spill_runnable = std::make_shared<SpillRunnable>(state, 
_runtime_profile.get(), true,
+                                                          
_shared_state->shared_from_this(),
                                                           
exception_catch_func);
     return spill_io_pool->submit(std::move(spill_runnable));
 }
@@ -296,8 +295,7 @@ Status 
PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in
 
     if (probe_spilling_stream) {
         RETURN_IF_ERROR(probe_spilling_stream->spill_eof());
-        probe_spilling_stream->set_read_counters(_spill_read_data_time, 
_spill_deserialize_time,
-                                                 _spill_read_bytes, 
_spill_read_wait_io_timer);
+        probe_spilling_stream->set_read_counters(profile());
     }
 
     return Status::OK();
@@ -314,20 +312,15 @@ Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
     if (!spilled_stream) {
         return Status::OK();
     }
-
-    spilled_stream->set_read_counters(_spill_read_data_time, 
_spill_deserialize_time,
-                                      _spill_read_bytes, 
_spill_read_wait_io_timer);
+    spilled_stream->set_read_counters(profile());
 
     std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
             _shared_state->shared_from_this();
 
     auto query_id = state->query_id();
 
-    MonotonicStopWatch submit_timer;
-    submit_timer.start();
-
     auto read_func = [this, query_id, state, spilled_stream = spilled_stream, 
shared_state_holder,
-                      submit_timer, partition_index] {
+                      partition_index] {
         auto shared_state_sptr = shared_state_holder.lock();
         if (!shared_state_sptr || state->is_cancelled()) {
             LOG(INFO) << "query: " << print_id(query_id)
@@ -335,7 +328,6 @@ Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
             return;
         }
 
-        _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
         SCOPED_TIMER(_recovery_build_timer);
 
         bool eos = false;
@@ -396,7 +388,19 @@ Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
         }
     };
 
-    auto exception_catch_func = [read_func, query_id, this]() {
+    MonotonicStopWatch submit_timer;
+    submit_timer.start();
+
+    auto exception_catch_func = [read_func, query_id, submit_timer, this]() {
+        auto submit_elapsed_time = submit_timer.elapsed_time();
+        _spill_read_wait_in_queue_timer->update(submit_elapsed_time);
+        exec_time_counter()->update(submit_elapsed_time);
+        _spill_total_timer->update(submit_elapsed_time);
+
+        SCOPED_TIMER(exec_time_counter());
+        SCOPED_TIMER(_spill_total_timer);
+        SCOPED_TIMER(_spill_recover_time);
+
         
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_build_blocks_cancel",
 {
             ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
                     query_id, Status::InternalError("fault_inject 
partitioned_hash_join_probe "
@@ -436,7 +440,8 @@ Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
                                 "fault_inject partitioned_hash_join_probe "
                                 "recovery_build_blocks submit_func failed");
                     });
-    auto spill_runnable = std::make_shared<SpillRunnable>(state, 
_shared_state->shared_from_this(),
+    auto spill_runnable = std::make_shared<SpillRunnable>(state, 
_runtime_profile.get(), false,
+                                                          
_shared_state->shared_from_this(),
                                                           
exception_catch_func);
     VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << 
_parent->node_id()
                << ", task id: " << state->task_id() << ", partition: " << 
partition_index
@@ -475,14 +480,12 @@ Status 
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim
         return Status::OK();
     }
 
+    spilled_stream->set_read_counters(profile());
     auto& blocks = _probe_blocks[partition_index];
 
     auto query_id = state->query_id();
-    MonotonicStopWatch submit_timer;
-    submit_timer.start();
 
-    auto read_func = [this, query_id, &spilled_stream, &blocks, submit_timer] {
-        _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+    auto read_func = [this, query_id, &spilled_stream, &blocks] {
         SCOPED_TIMER(_recovery_probe_timer);
 
         vectorized::Block block;
@@ -518,7 +521,18 @@ Status 
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim
         }
     };
 
-    auto exception_catch_func = [read_func, query_id, this]() {
+    MonotonicStopWatch submit_timer;
+    submit_timer.start();
+    auto exception_catch_func = [read_func, query_id, submit_timer, this]() {
+        auto submit_elapsed_time = submit_timer.elapsed_time();
+        _spill_read_wait_in_queue_timer->update(submit_elapsed_time);
+        exec_time_counter()->update(submit_elapsed_time);
+        _spill_total_timer->update(submit_elapsed_time);
+
+        SCOPED_TIMER(exec_time_counter());
+        SCOPED_TIMER(_spill_total_timer);
+        SCOPED_TIMER(_spill_recover_time);
+
         
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_probe_blocks_cancel",
 {
             ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
                     query_id, Status::InternalError("fault_inject 
partitioned_hash_join_probe "
@@ -549,7 +563,8 @@ Status 
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim
                                 "recovery_probe_blocks submit_func failed");
                     });
     return spill_io_pool->submit(std::make_shared<SpillRunnable>(
-            state, _shared_state->shared_from_this(), exception_catch_func));
+            state, _runtime_profile.get(), false, 
_shared_state->shared_from_this(),
+            exception_catch_func));
 }
 
 
PartitionedHashJoinProbeOperatorX::PartitionedHashJoinProbeOperatorX(ObjectPool*
 pool,
@@ -957,6 +972,7 @@ Status 
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
                                                     bool* eos) {
     *eos = false;
     auto& local_state = get_local_state(state);
+    local_state.copy_shared_spill_profile();
     const auto need_to_spill = local_state._shared_state->need_to_spill;
 #ifndef NDEBUG
     Defer eos_check_defer([&] {
@@ -1017,6 +1033,7 @@ Status 
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
         }
 
         local_state.add_num_rows_returned(block->rows());
+        COUNTER_UPDATE(local_state._blocks_returned_counter, 1);
         if (*eos) {
             _update_profile_from_internal_states(local_state);
         }
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index e66b730685b..6ecbdd01e49 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -109,11 +109,6 @@ private:
     RuntimeProfile::Counter* _recovery_probe_blocks = nullptr;
     RuntimeProfile::Counter* _recovery_probe_timer = nullptr;
 
-    RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr;
-    RuntimeProfile::Counter* _spill_write_disk_timer = nullptr;
-    RuntimeProfile::Counter* _spill_data_size = nullptr;
-    RuntimeProfile::Counter* _spill_block_count = nullptr;
-
     RuntimeProfile::Counter* _build_phase_label = nullptr;
     RuntimeProfile::Counter* _build_rows_counter = nullptr;
     RuntimeProfile::Counter* _publish_runtime_filter_timer = nullptr;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index f7d38fe9d5d..8c9990da1ef 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -64,6 +64,7 @@ Status 
PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state,
 Status PartitionedHashJoinSinkLocalState::open(RuntimeState* state) {
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
+    _shared_state->setup_shared_profile(_profile);
     RETURN_IF_ERROR(PipelineXSpillSinkLocalState::open(state));
     auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
     for (uint32_t i = 0; i != p._partition_count; ++i) {
@@ -72,10 +73,7 @@ Status PartitionedHashJoinSinkLocalState::open(RuntimeState* 
state) {
                 state, spilling_stream, print_id(state->query_id()),
                 fmt::format("hash_build_sink_{}", i), _parent->node_id(),
                 std::numeric_limits<int32_t>::max(), 
std::numeric_limits<size_t>::max(), _profile));
-        RETURN_IF_ERROR(spilling_stream->prepare_spill());
-        spilling_stream->set_write_counters(_spill_serialize_block_timer, 
_spill_block_count,
-                                            _spill_data_size, 
_spill_write_disk_timer,
-                                            _spill_write_wait_io_timer, 
memory_used_counter());
+        spilling_stream->set_write_counters(_profile);
     }
     return p._partitioner->clone(state, _partitioner);
 }
@@ -117,6 +115,32 @@ size_t 
PartitionedHashJoinSinkLocalState::revocable_mem_size(RuntimeState* state
     return mem_size;
 }
 
+void PartitionedHashJoinSinkLocalState::update_memory_usage() {
+    if (!_shared_state->need_to_spill) {
+        if (_shared_state->inner_shared_state) {
+            auto* inner_sink_state_ = 
_shared_state->inner_runtime_state->get_sink_local_state();
+            if (inner_sink_state_) {
+                auto* inner_sink_state =
+                        
assert_cast<HashJoinBuildSinkLocalState*>(inner_sink_state_);
+                COUNTER_SET(_memory_used_counter, 
inner_sink_state->_memory_used_counter->value());
+                COUNTER_SET(_peak_memory_usage_counter,
+                            inner_sink_state->_memory_used_counter->value());
+            }
+        }
+        return;
+    }
+
+    int64_t mem_size = 0;
+    auto& partitioned_blocks = _shared_state->partitioned_build_blocks;
+    for (auto& block : partitioned_blocks) {
+        if (block) {
+            mem_size += block->allocated_bytes();
+        }
+    }
+    COUNTER_SET(_memory_used_counter, mem_size);
+    COUNTER_SET(_peak_memory_usage_counter, mem_size);
+}
+
 size_t PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState* 
state, bool eos) {
     size_t size_to_reserve = 0;
     auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
@@ -136,14 +160,21 @@ size_t 
PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState* sta
 Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
         RuntimeState* state, const std::shared_ptr<SpillContext>& 
spill_context) {
     auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
+    HashJoinBuildSinkLocalState* inner_sink_state {nullptr};
+    if (auto* tmp_sink_state = 
_shared_state->inner_runtime_state->get_sink_local_state()) {
+        inner_sink_state = 
assert_cast<HashJoinBuildSinkLocalState*>(tmp_sink_state);
+    }
     _shared_state->inner_shared_state->hash_table_variants.reset();
+    if (inner_sink_state) {
+        COUNTER_UPDATE(_memory_used_counter,
+                       -(inner_sink_state->_hash_table_memory_usage->value() +
+                         
inner_sink_state->_build_arena_memory_usage->value()));
+    }
     auto row_desc = p._child->row_desc();
     const auto num_slots = row_desc.num_slots();
     vectorized::Block build_block;
-    size_t block_old_mem = 0;
-    auto* inner_sink_state_ = 
_shared_state->inner_runtime_state->get_sink_local_state();
-    if (inner_sink_state_) {
-        auto* inner_sink_state = 
assert_cast<HashJoinBuildSinkLocalState*>(inner_sink_state_);
+    int64_t block_old_mem = 0;
+    if (inner_sink_state) {
         build_block = inner_sink_state->_build_side_mutable_block.to_block();
         block_old_mem = build_block.allocated_bytes();
     }
@@ -158,12 +189,12 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
     }
 
     if (build_block.columns() > num_slots) {
-        build_block.erase(num_slots);
-        memory_used_counter()->update(build_block.allocated_bytes() - 
block_old_mem);
+        vectorized::Block::erase_useless_column(&build_block, num_slots);
+        COUNTER_UPDATE(_memory_used_counter, build_block.allocated_bytes() - 
block_old_mem);
     }
 
     auto spill_func = [build_block = std::move(build_block), state, this]() 
mutable {
-        Defer defer {[&]() { 
memory_used_counter()->set((int64_t)revocable_mem_size(state)); }};
+        Defer defer1 {[&]() { update_memory_usage(); }};
         auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
         auto& partitioned_blocks = _shared_state->partitioned_build_blocks;
         std::vector<std::vector<uint32_t>> 
partitions_indexes(p._partition_count);
@@ -175,7 +206,6 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
         auto flush_rows = [&state, 
this](std::unique_ptr<vectorized::MutableBlock>& partition_block,
                                          vectorized::SpillStreamSPtr& 
spilling_stream) {
             auto block = partition_block->to_block();
-            Defer defer {[&]() { 
memory_used_counter()->update(-block.allocated_bytes()); }};
             auto status = spilling_stream->spill_block(state, block, false);
 
             if (!status.ok()) {
@@ -198,9 +228,11 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
                 sub_block.get_by_position(i).column =
                         build_block.get_by_position(i).column->cut(offset, 
this_run);
             }
-            auto sub_blocks_memory_usage = sub_block.allocated_bytes();
-            memory_used_counter()->update(sub_blocks_memory_usage);
-            Defer defer {[&]() { 
memory_used_counter()->update(-sub_blocks_memory_usage); }};
+            int64_t sub_blocks_memory_usage = sub_block.allocated_bytes();
+            COUNTER_UPDATE(_memory_used_counter, sub_blocks_memory_usage);
+            COUNTER_SET(_peak_memory_usage_counter, 
_memory_used_counter->value());
+            Defer defer2 {
+                    [&]() { COUNTER_UPDATE(_memory_used_counter, 
-sub_blocks_memory_usage); }};
 
             offset += this_run;
             const auto is_last_block = offset == total_rows;
@@ -225,12 +257,11 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
                     partition_block =
                             
vectorized::MutableBlock::create_unique(build_block.clone_empty());
                 }
-                auto old_mem = partition_block->allocated_bytes();
 
+                int64_t old_mem = partition_block->allocated_bytes();
                 {
                     SCOPED_TIMER(_partition_shuffle_timer);
                     Status st = partition_block->add_rows(&sub_block, begin, 
end);
-                    
memory_used_counter()->update(partition_block->allocated_bytes() - old_mem);
                     if (!st.ok()) {
                         std::unique_lock<std::mutex> lock(_spill_lock);
                         _spill_status = st;
@@ -240,6 +271,7 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
                     }
                     partitions_indexes[partition_idx].clear();
                 }
+                int64_t new_mem = partition_block->allocated_bytes();
 
                 if (partition_block->rows() >= reserved_size || is_last_block) 
{
                     if (!flush_rows(partition_block, spilling_stream)) {
@@ -247,7 +279,10 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
                     }
                     partition_block =
                             
vectorized::MutableBlock::create_unique(build_block.clone_empty());
-                    
memory_used_counter()->update(partition_block->allocated_bytes());
+                    COUNTER_UPDATE(_memory_used_counter, -new_mem);
+                } else {
+                    COUNTER_UPDATE(_memory_used_counter, new_mem - old_mem);
+                    COUNTER_SET(_peak_memory_usage_counter, 
_memory_used_counter->value());
                 }
             }
         }
@@ -255,8 +290,18 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
         _spill_dependency->set_ready();
     };
 
-    auto exception_catch_func = [spill_func, spill_context, this]() mutable {
-        SCOPED_TIMER(_spill_timer);
+    MonotonicStopWatch submit_timer;
+    submit_timer.start();
+    auto exception_catch_func = [spill_func, spill_context, submit_timer, 
this]() mutable {
+        auto submit_elapsed_time = submit_timer.elapsed_time();
+        _spill_write_wait_in_queue_timer->update(submit_elapsed_time);
+        exec_time_counter()->update(submit_elapsed_time);
+        _spill_total_timer->update(submit_elapsed_time);
+
+        SCOPED_TIMER(exec_time_counter());
+        SCOPED_TIMER(_spill_total_timer);
+        SCOPED_TIMER(_spill_write_timer);
+
         auto status = [&]() {
             RETURN_IF_CATCH_EXCEPTION(spill_func());
             return Status::OK();
@@ -274,8 +319,8 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
         }
     };
 
-    auto spill_runnable = std::make_shared<SpillRunnable>(state, 
_shared_state->shared_from_this(),
-                                                          
exception_catch_func);
+    auto spill_runnable = std::make_shared<SpillRunnable>(
+            state, _profile, true, _shared_state->shared_from_this(), 
exception_catch_func);
 
     auto* thread_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
 
@@ -291,6 +336,7 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
 
 Status PartitionedHashJoinSinkLocalState::revoke_memory(
         RuntimeState* state, const std::shared_ptr<SpillContext>& 
spill_context) {
+    SCOPED_TIMER(_spill_total_timer);
     VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", task: " << 
state->task_id()
                << " hash join sink " << _parent->node_id() << " revoke_memory"
                << ", eos: " << _child_eos;
@@ -307,6 +353,9 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
 
     auto query_id = state->query_id();
 
+    auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
+    DCHECK(spill_io_pool != nullptr);
+
     for (size_t i = 0; i != _shared_state->partitioned_build_blocks.size(); 
++i) {
         vectorized::SpillStreamSPtr& spilling_stream = 
_shared_state->spilled_streams[i];
         auto& mutable_block = _shared_state->partitioned_build_blocks[i];
@@ -319,10 +368,6 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
 
         DCHECK(spilling_stream != nullptr);
 
-        auto* spill_io_pool =
-                
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
-        DCHECK(spill_io_pool != nullptr);
-
         MonotonicStopWatch submit_timer;
         submit_timer.start();
 
@@ -335,8 +380,17 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
         // so that when a stream finished, it should desc -1
         state->get_query_ctx()->increase_revoking_tasks_count();
         auto spill_runnable = std::make_shared<SpillRunnable>(
-                state, _shared_state->shared_from_this(),
+                state, _profile, true, _shared_state->shared_from_this(),
                 [this, query_id, spilling_stream, i, submit_timer, 
spill_context] {
+                    auto submit_elapsed_time = submit_timer.elapsed_time();
+                    
_spill_write_wait_in_queue_timer->update(submit_elapsed_time);
+                    exec_time_counter()->update(submit_elapsed_time);
+                    _spill_total_timer->update(submit_elapsed_time);
+
+                    SCOPED_TIMER(exec_time_counter());
+                    SCOPED_TIMER(_spill_total_timer);
+                    SCOPED_TIMER(_spill_write_timer);
+
                     DBUG_EXECUTE_IF(
                             
"fault_inject::partitioned_hash_join_sink::revoke_memory_cancel", {
                                 
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
@@ -345,7 +399,6 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
                                                           "revoke_memory 
canceled"));
                                 return;
                             });
-                    
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
                     SCOPED_TIMER(_spill_build_timer);
 
                     auto status = [&]() {
@@ -411,7 +464,7 @@ Status 
PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state,
     if (!rows) {
         return Status::OK();
     }
-    Defer defer {[&]() { 
memory_used_counter()->set((int64_t)revocable_mem_size(state)); }};
+    Defer defer {[&]() { update_memory_usage(); }};
     {
         /// TODO: DO NOT execute build exprs twice(when partition and building 
hash table)
         SCOPED_TIMER(_partition_timer);
@@ -455,10 +508,9 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk(
 
     if (_spill_status_ok) {
         auto block = partitioned_block->to_block();
-        auto block_mem_usage = block.allocated_bytes();
-        Defer defer {[&]() { memory_used_counter()->update(-block_mem_usage); 
}};
+        int64_t block_mem_usage = block.allocated_bytes();
+        Defer defer {[&]() { COUNTER_UPDATE(memory_used_counter(), 
-block_mem_usage); }};
         partitioned_block = 
vectorized::MutableBlock::create_unique(block.clone_empty());
-        memory_used_counter()->update(partitioned_block->allocated_bytes());
         auto st = spilling_stream->spill_block(state(), block, false);
         if (!st.ok()) {
             _spill_status_ok = false;
@@ -609,10 +661,7 @@ Status 
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
                             "fault_inject partitioned_hash_join_sink "
                             "sink_eos failed");
                 });
-                Defer defer {[&]() {
-                    local_state.memory_used_counter()->set(
-                            (int64_t)local_state.revocable_mem_size(state));
-                }};
+                Defer defer {[&]() { local_state.update_memory_usage(); }};
                 RETURN_IF_ERROR(_inner_sink_operator->sink(
                         local_state._shared_state->inner_runtime_state.get(), 
in_block, eos));
                 VLOG_DEBUG << "hash join sink " << node_id() << " sink eos, 
set_ready_to_read"
@@ -650,9 +699,7 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* 
state, vectorized::B
                     "fault_inject partitioned_hash_join_sink "
                     "sink failed");
         });
-        Defer defer {[&]() {
-            
local_state.memory_used_counter()->set((int64_t)local_state.revocable_mem_size(state));
-        }};
+        Defer defer {[&]() { local_state.update_memory_usage(); }};
         RETURN_IF_ERROR(_inner_sink_operator->sink(
                 local_state._shared_state->inner_runtime_state.get(), 
in_block, eos));
 
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index d3725997882..aaa6d64adf9 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -47,6 +47,7 @@ public:
     Status revoke_memory(RuntimeState* state, const 
std::shared_ptr<SpillContext>& spill_context);
     size_t revocable_mem_size(RuntimeState* state) const;
     [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state, bool eos);
+    void update_memory_usage();
 
 protected:
     PartitionedHashJoinSinkLocalState(DataSinkOperatorXBase* parent, 
RuntimeState* state)
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 38e7a173197..5d8355b865d 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -50,6 +50,13 @@ Status SpillSortSinkLocalState::init(doris::RuntimeState* 
state,
     return Status::OK();
 }
 
+Status SpillSortSinkLocalState::open(RuntimeState* state) {
+    SCOPED_TIMER(Base::exec_time_counter());
+    SCOPED_TIMER(Base::_open_timer);
+    _shared_state->setup_shared_profile(_profile);
+    return Base::open(state);
+}
+
 void SpillSortSinkLocalState::_init_counters() {
     _internal_runtime_profile = 
std::make_unique<RuntimeProfile>("internal_profile");
 
@@ -57,10 +64,7 @@ void SpillSortSinkLocalState::_init_counters() {
     _merge_block_timer = ADD_TIMER(_profile, "MergeBlockTime");
     _sort_blocks_memory_usage =
             ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsageSortBlocks", 
TUnit::BYTES, 1);
-
     _spill_merge_sort_timer = ADD_TIMER_WITH_LEVEL(_profile, 
"SpillMergeSortTime", 1);
-
-    _spill_wait_in_queue_timer = ADD_TIMER_WITH_LEVEL(profile(), 
"SpillWaitInQueueTime", 1);
 }
 #define UPDATE_PROFILE(counter, name)                           \
     do {                                                        \
@@ -201,13 +205,8 @@ Status 
SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
             _shared_state->spill_block_batch_row_count,
             SpillSortSharedState::SORT_BLOCK_SPILL_BATCH_BYTES, profile());
     RETURN_IF_ERROR(status);
+    _spilling_stream->set_write_counters(_profile);
 
-    _spilling_stream->set_write_counters(
-            Base::_spill_serialize_block_timer, Base::_spill_block_count, 
Base::_spill_data_size,
-            Base::_spill_write_disk_timer, Base::_spill_write_wait_io_timer, 
memory_used_counter());
-
-    status = _spilling_stream->prepare_spill();
-    RETURN_IF_ERROR(status);
     _shared_state->sorted_streams.emplace_back(_spilling_stream);
 
     auto& parent = Base::_parent->template cast<Parent>();
@@ -218,11 +217,7 @@ Status 
SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
     }
     auto query_id = state->query_id();
 
-    MonotonicStopWatch submit_timer;
-    submit_timer.start();
-
-    auto spill_func = [this, state, query_id, &parent, submit_timer] {
-        _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+    auto spill_func = [this, state, query_id, &parent] {
         Defer defer {[&]() {
             if (!_shared_state->sink_status.ok() || state->is_cancelled()) {
                 if (!_shared_state->sink_status.ok()) {
@@ -267,10 +262,7 @@ Status 
SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
                         &eos);
             }
             RETURN_IF_ERROR(_shared_state->sink_status);
-            {
-                SCOPED_TIMER(Base::_spill_timer);
-                _shared_state->sink_status = 
_spilling_stream->spill_block(state, block, eos);
-            }
+            _shared_state->sink_status = _spilling_stream->spill_block(state, 
block, eos);
             RETURN_IF_ERROR(_shared_state->sink_status);
             block.clear_column_data();
         }
@@ -279,7 +271,19 @@ Status 
SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
         return Status::OK();
     };
 
-    auto exception_catch_func = [this, query_id, spill_context, spill_func]() {
+    MonotonicStopWatch submit_timer;
+    submit_timer.start();
+
+    auto exception_catch_func = [this, query_id, spill_context, submit_timer, 
spill_func]() {
+        auto submit_elapsed_time = submit_timer.elapsed_time();
+        _spill_write_wait_in_queue_timer->update(submit_elapsed_time);
+        exec_time_counter()->update(submit_elapsed_time);
+        _spill_total_timer->update(submit_elapsed_time);
+
+        SCOPED_TIMER(exec_time_counter());
+        SCOPED_TIMER(_spill_total_timer);
+        SCOPED_TIMER(_spill_write_timer);
+
         DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_cancel", 
{
             ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
                     query_id, Status::InternalError("fault_inject 
spill_sort_sink "
@@ -304,7 +308,8 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* 
state,
     if (status.ok()) {
         state->get_query_ctx()->increase_revoking_tasks_count();
         status = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
-                std::make_shared<SpillRunnable>(state, 
_shared_state->shared_from_this(),
+                std::make_shared<SpillRunnable>(state, _profile, true,
+                                                
_shared_state->shared_from_this(),
                                                 exception_catch_func));
     }
     if (!status.ok()) {
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h 
b/be/src/pipeline/exec/spill_sort_sink_operator.h
index 7a361199239..086d93a970c 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.h
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.h
@@ -36,6 +36,7 @@ public:
     ~SpillSortSinkLocalState() override = default;
 
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+    Status open(RuntimeState* state) override;
     Status close(RuntimeState* state, Status exec_status) override;
     Dependency* finishdependency() override { return _finish_dependency.get(); 
}
 
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp 
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index fe1356381b2..5cc124caaea 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -36,6 +36,7 @@ SpillSortLocalState::SpillSortLocalState(RuntimeState* state, 
OperatorXBase* par
 }
 Status SpillSortLocalState::init(RuntimeState* state, LocalStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
+    init_spill_write_counters();
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_init_timer);
 
@@ -45,14 +46,6 @@ Status SpillSortLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
 
     _internal_runtime_profile = 
std::make_unique<RuntimeProfile>("internal_profile");
     _spill_merge_sort_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillMergeSortTime", 1);
-    _spill_serialize_block_timer =
-            ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillSerializeBlockTime", 
1);
-    _spill_write_disk_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillWriteDiskTime", 1);
-    _spill_data_size =
-            ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteDataSize", 
TUnit::BYTES, 1);
-    _spill_block_count =
-            ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockCount", 
TUnit::UNIT, 1);
-    _spill_wait_in_queue_timer = ADD_TIMER_WITH_LEVEL(profile(), 
"SpillWaitInQueueTime", 1);
     return Status::OK();
 }
 
@@ -86,11 +79,7 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
 
     auto query_id = state->query_id();
 
-    MonotonicStopWatch submit_timer;
-    submit_timer.start();
-
-    auto spill_func = [this, state, query_id, &parent, submit_timer] {
-        _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+    auto spill_func = [this, state, query_id, &parent] {
         SCOPED_TIMER(_spill_merge_sort_timer);
         Defer defer {[&]() {
             if (!_status.ok() || state->is_cancelled()) {
@@ -136,15 +125,11 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
                         _shared_state->spill_block_batch_row_count,
                         SpillSortSharedState::SORT_BLOCK_SPILL_BATCH_BYTES, 
profile());
                 RETURN_IF_ERROR(_status);
-                _status = tmp_stream->prepare_spill();
-                RETURN_IF_ERROR(_status);
+                tmp_stream->set_write_counters(profile());
 
                 _shared_state->sorted_streams.emplace_back(tmp_stream);
 
                 bool eos = false;
-                tmp_stream->set_write_counters(_spill_serialize_block_timer, 
_spill_block_count,
-                                               _spill_data_size, 
_spill_write_disk_timer,
-                                               _spill_write_wait_io_timer, 
memory_used_counter());
                 while (!eos && !state->is_cancelled()) {
                     merge_sorted_block.clear_column_data();
                     {
@@ -178,7 +163,19 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
         return Status::OK();
     };
 
-    auto exception_catch_func = [this, spill_func]() {
+    MonotonicStopWatch submit_timer;
+    submit_timer.start();
+
+    auto exception_catch_func = [this, spill_func, submit_timer]() {
+        auto submit_elapsed_time = submit_timer.elapsed_time();
+        _spill_read_wait_in_queue_timer->update(submit_elapsed_time);
+        exec_time_counter()->update(submit_elapsed_time);
+        _spill_total_timer->update(submit_elapsed_time);
+
+        SCOPED_TIMER(exec_time_counter());
+        SCOPED_TIMER(_spill_total_timer);
+        SCOPED_TIMER(_spill_recover_time);
+
         _status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); }); 
}();
     };
 
@@ -188,7 +185,8 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
                 "merge_sort_spill_data submit_func failed");
     });
     return 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
-            std::make_shared<SpillRunnable>(state, 
_shared_state->shared_from_this(),
+            std::make_shared<SpillRunnable>(state, _runtime_profile.get(), 
false,
+                                            _shared_state->shared_from_this(),
                                             exception_catch_func));
 }
 
@@ -203,8 +201,7 @@ Status SpillSortLocalState::_create_intermediate_merger(
     _current_merging_streams.clear();
     for (int i = 0; i < num_blocks && !_shared_state->sorted_streams.empty(); 
++i) {
         auto stream = _shared_state->sorted_streams.front();
-        stream->set_read_counters(Base::_spill_read_data_time, 
Base::_spill_deserialize_time,
-                                  Base::_spill_read_bytes, 
Base::_spill_read_wait_io_timer);
+        stream->set_read_counters(profile());
         _current_merging_streams.emplace_back(stream);
         child_block_suppliers.emplace_back(
                 
std::bind(std::mem_fn(&vectorized::SpillStream::read_next_block_sync), 
stream.get(),
@@ -263,6 +260,7 @@ Status SpillSortSourceOperatorX::close(RuntimeState* state) 
{
 Status SpillSortSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block* block,
                                            bool* eos) {
     auto& local_state = get_local_state(state);
+    local_state.copy_shared_spill_profile();
     Defer defer {[&]() {
         if (!local_state._status.ok() || *eos) {
             local_state._shared_state->close();
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.h 
b/be/src/pipeline/exec/spill_sort_source_operator.h
index ca984e352fc..7536dd15e92 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.h
+++ b/be/src/pipeline/exec/spill_sort_source_operator.h
@@ -65,10 +65,6 @@ protected:
     std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
     // counters for spill merge sort
     RuntimeProfile::Counter* _spill_merge_sort_timer = nullptr;
-    RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr;
-    RuntimeProfile::Counter* _spill_write_disk_timer = nullptr;
-    RuntimeProfile::Counter* _spill_data_size = nullptr;
-    RuntimeProfile::Counter* _spill_block_count = nullptr;
 };
 class SortSourceOperatorX;
 class SpillSortSourceOperatorX : public OperatorX<SpillSortLocalState> {
diff --git a/be/src/pipeline/exec/spill_utils.h 
b/be/src/pipeline/exec/spill_utils.h
index d2b157463ae..2ea5cedcdb0 100644
--- a/be/src/pipeline/exec/spill_utils.h
+++ b/be/src/pipeline/exec/spill_utils.h
@@ -29,6 +29,7 @@
 #include "runtime/runtime_state.h"
 #include "runtime/task_execution_context.h"
 #include "runtime/thread_context.h"
+#include "util/runtime_profile.h"
 #include "util/threadpool.h"
 #include "vec/runtime/partitioner.h"
 
@@ -78,12 +79,23 @@ private:
 
 class SpillRunnable : public Runnable {
 public:
-    SpillRunnable(RuntimeState* state, const 
std::shared_ptr<BasicSharedState>& shared_state,
-                  std::function<void()> func)
+    SpillRunnable(RuntimeState* state, RuntimeProfile* profile, bool is_write,
+                  const std::shared_ptr<BasicSharedState>& shared_state, 
std::function<void()> func)
             : _state(state),
+              _is_write(is_write),
               _task_context_holder(state->get_task_execution_context()),
               _shared_state_holder(shared_state),
-              _func(std::move(func)) {}
+              _func(std::move(func)) {
+        write_wait_in_queue_task_count = 
profile->get_counter("SpillWriteTaskWaitInQueueCount");
+        writing_task_count = profile->get_counter("SpillWriteTaskCount");
+        read_wait_in_queue_task_count = 
profile->get_counter("SpillReadTaskWaitInQueueCount");
+        reading_task_count = profile->get_counter("SpillReadTaskCount");
+        if (is_write) {
+            COUNTER_UPDATE(write_wait_in_queue_task_count, 1);
+        } else {
+            COUNTER_UPDATE(read_wait_in_queue_task_count, 1);
+        }
+    }
 
     ~SpillRunnable() override = default;
 
@@ -94,8 +106,20 @@ public:
         if (!task_context_holder) {
             return;
         }
+        if (_is_write) {
+            COUNTER_UPDATE(write_wait_in_queue_task_count, -1);
+            COUNTER_UPDATE(writing_task_count, 1);
+        } else {
+            COUNTER_UPDATE(read_wait_in_queue_task_count, -1);
+            COUNTER_UPDATE(reading_task_count, 1);
+        }
         SCOPED_ATTACH_TASK(_state);
         Defer defer([&] {
+            if (_is_write) {
+                COUNTER_UPDATE(writing_task_count, -1);
+            } else {
+                COUNTER_UPDATE(reading_task_count, -1);
+            }
             std::function<void()> tmp;
             std::swap(tmp, _func);
         });
@@ -113,6 +137,11 @@ public:
 
 private:
     RuntimeState* _state;
+    bool _is_write;
+    RuntimeProfile::Counter* write_wait_in_queue_task_count = nullptr;
+    RuntimeProfile::Counter* writing_task_count = nullptr;
+    RuntimeProfile::Counter* read_wait_in_queue_task_count = nullptr;
+    RuntimeProfile::Counter* reading_task_count = nullptr;
     std::weak_ptr<TaskExecutionContext> _task_context_holder;
     std::weak_ptr<BasicSharedState> _shared_state_holder;
     std::function<void()> _func;
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index e127565b15f..d69c43dfc46 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -95,7 +95,6 @@ Status StreamingAggLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     _build_timer = ADD_TIMER(Base::profile(), "BuildTime");
     _build_table_convert_timer = ADD_TIMER(Base::profile(), 
"BuildConvertToPartitionedTime");
     _serialize_key_timer = ADD_TIMER(Base::profile(), "SerializeKeyTime");
-    _exec_timer = ADD_TIMER(Base::profile(), "ExecTime");
     _merge_timer = ADD_TIMER(Base::profile(), "MergeTime");
     _expr_timer = ADD_TIMER(Base::profile(), "ExprTime");
     _serialize_data_timer = ADD_TIMER(Base::profile(), "SerializeDataTime");
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index f1804601693..5ed3509ae78 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -548,9 +548,11 @@ public:
     [[nodiscard]] Status merge_impl_ignore_overflow(T&& block) {
         if (_columns.size() != block.columns()) {
             return Status::Error<ErrorCode::INTERNAL_ERROR>(
-                    "Merge block not match, self:[columns: {}, types: {}], 
input:[columns: {}, "
+                    "Merge block not match, self column count: {}, [columns: 
{}, types: {}], "
+                    "input column count: {}, [columns: {}, "
                     "types: {}], ",
-                    dump_names(), dump_types(), block.dump_names(), 
block.dump_types());
+                    _columns.size(), dump_names(), dump_types(), 
block.columns(),
+                    block.dump_names(), block.dump_types());
         }
         for (int i = 0; i < _columns.size(); ++i) {
             DCHECK(_data_types[i]->equals(*block.get_by_position(i).type))
@@ -583,9 +585,11 @@ public:
         } else {
             if (_columns.size() != block.columns()) {
                 return Status::Error<ErrorCode::INTERNAL_ERROR>(
-                        "Merge block not match, self:[columns: {}, types: {}], 
input:[columns: {}, "
+                        "Merge block not match, self column count: {}, 
[columns: {}, types: {}], "
+                        "input column count: {}, [columns: {}, "
                         "types: {}], ",
-                        dump_names(), dump_types(), block.dump_names(), 
block.dump_types());
+                        _columns.size(), dump_names(), dump_types(), 
block.columns(),
+                        block.dump_names(), block.dump_types());
             }
             for (int i = 0; i < _columns.size(); ++i) {
                 if (!_data_types[i]->equals(*block.get_by_position(i).type)) {
diff --git a/be/src/vec/spill/spill_reader.cpp 
b/be/src/vec/spill/spill_reader.cpp
index f0320ee9b49..d0258344541 100644
--- a/be/src/vec/spill/spill_reader.cpp
+++ b/be/src/vec/spill/spill_reader.cpp
@@ -37,7 +37,9 @@ Status SpillReader::open() {
         return Status::OK();
     }
 
-    SCOPED_TIMER(read_timer_);
+    SCOPED_TIMER(_read_file_timer);
+
+    COUNTER_UPDATE(_read_file_count, 1);
 
     RETURN_IF_ERROR(io::global_local_filesystem()->open_file(file_path_, 
&file_reader_));
 
@@ -50,12 +52,14 @@ Status SpillReader::open() {
     size_t bytes_read = 0;
     RETURN_IF_ERROR(file_reader_->read_at(file_size - sizeof(size_t), result, 
&bytes_read));
     DCHECK(bytes_read == 8); // max_sub_block_size, block count
+    COUNTER_UPDATE(_read_file_size, bytes_read);
 
     // read max sub block size
     bytes_read = 0;
     result.data = (char*)&max_sub_block_size_;
     RETURN_IF_ERROR(file_reader_->read_at(file_size - sizeof(size_t) * 2, 
result, &bytes_read));
     DCHECK(bytes_read == 8); // max_sub_block_size, block count
+    COUNTER_UPDATE(_read_file_size, bytes_read);
 
     size_t buff_size = std::max(block_count_ * sizeof(size_t), 
max_sub_block_size_);
     try {
@@ -73,6 +77,7 @@ Status SpillReader::open() {
 
     RETURN_IF_ERROR(file_reader_->read_at(read_offset, result, &bytes_read));
     DCHECK(bytes_read == block_count_ * sizeof(size_t));
+    COUNTER_UPDATE(_read_file_size, bytes_read);
 
     block_start_offsets_.resize(block_count_ + 1);
     for (size_t i = 0; i < block_count_; ++i) {
@@ -103,21 +108,24 @@ Status SpillReader::read(Block* block, bool* eos) {
     Slice result(read_buff_.get(), bytes_to_read);
     size_t bytes_read = 0;
     {
-        SCOPED_TIMER(read_timer_);
+        SCOPED_TIMER(_read_file_timer);
         
RETURN_IF_ERROR(file_reader_->read_at(block_start_offsets_[read_block_index_], 
result,
                                               &bytes_read));
     }
     DCHECK(bytes_read == bytes_to_read);
-    COUNTER_UPDATE(read_bytes_, bytes_read);
 
     if (bytes_read > 0) {
+        COUNTER_UPDATE(_read_file_size, bytes_read);
+        COUNTER_UPDATE(_read_block_count, 1);
         {
-            SCOPED_TIMER(deserialize_timer_);
+            SCOPED_TIMER(_deserialize_timer);
             if (!pb_block_.ParseFromArray(result.data, result.size)) {
                 return Status::InternalError("Failed to read spilled block");
             }
             RETURN_IF_ERROR(block->deserialize(pb_block_));
         }
+        COUNTER_UPDATE(_read_block_data_size, block->bytes());
+        COUNTER_UPDATE(_read_rows_count, block->rows());
     } else {
         block->clear_column_data();
     }
diff --git a/be/src/vec/spill/spill_reader.h b/be/src/vec/spill/spill_reader.h
index 6694bf91572..fcb7d8d9e0b 100644
--- a/be/src/vec/spill/spill_reader.h
+++ b/be/src/vec/spill/spill_reader.h
@@ -50,12 +50,14 @@ public:
 
     size_t block_count() const { return block_count_; }
 
-    void set_counters(RuntimeProfile::Counter* read_timer,
-                      RuntimeProfile::Counter* deserialize_timer,
-                      RuntimeProfile::Counter* read_bytes) {
-        read_timer_ = read_timer;
-        deserialize_timer_ = deserialize_timer;
-        read_bytes_ = read_bytes;
+    void set_counters(RuntimeProfile* profile) {
+        _read_file_timer = profile->get_counter("SpillReadFileTime");
+        _deserialize_timer = 
profile->get_counter("SpillReadDerializeBlockTime");
+        _read_block_count = profile->get_counter("SpillReadBlockCount");
+        _read_block_data_size = profile->get_counter("SpillReadBlockDataSize");
+        _read_file_size = profile->get_counter("SpillReadFileSize");
+        _read_rows_count = profile->get_counter("SpillReadRows");
+        _read_file_count = profile->get_counter("SpillReadFileCount");
     }
 
 private:
@@ -71,9 +73,13 @@ private:
 
     PBlock pb_block_;
 
-    RuntimeProfile::Counter* read_timer_;
-    RuntimeProfile::Counter* deserialize_timer_;
-    RuntimeProfile::Counter* read_bytes_;
+    RuntimeProfile::Counter* _read_file_timer = nullptr;
+    RuntimeProfile::Counter* _deserialize_timer = nullptr;
+    RuntimeProfile::Counter* _read_block_count = nullptr;
+    RuntimeProfile::Counter* _read_block_data_size = nullptr;
+    RuntimeProfile::Counter* _read_file_size = nullptr;
+    RuntimeProfile::Counter* _read_rows_count = nullptr;
+    RuntimeProfile::Counter* _read_file_count = nullptr;
 };
 
 using SpillReaderUPtr = std::unique_ptr<SpillReader>;
diff --git a/be/src/vec/spill/spill_stream.cpp 
b/be/src/vec/spill/spill_stream.cpp
index 7189fad262c..6f9143b8073 100644
--- a/be/src/vec/spill/spill_stream.cpp
+++ b/be/src/vec/spill/spill_stream.cpp
@@ -44,16 +44,31 @@ SpillStream::SpillStream(RuntimeState* state, int64_t 
stream_id, SpillDataDir* d
           batch_rows_(batch_rows),
           batch_bytes_(batch_bytes),
           query_id_(state->query_id()),
-          profile_(profile) {}
+          profile_(profile) {
+    _total_file_count = profile_->get_counter("SpillWriteFileTotalCount");
+    _current_file_count = profile_->get_counter("SpillWriteFileCurrentCount");
+    _current_file_size = profile_->get_counter("SpillWriteFileCurrentSize");
+}
+
+void SpillStream::update_shared_profiles(RuntimeProfile* source_op_profile) {
+    _current_file_count = 
source_op_profile->get_counter("SpillWriteFileCurrentCount");
+    _current_file_size = 
source_op_profile->get_counter("SpillWriteFileCurrentSize");
+}
 
 SpillStream::~SpillStream() {
     gc();
 }
 
 void SpillStream::gc() {
+    if (_current_file_size) {
+        COUNTER_UPDATE(_current_file_size, -total_written_bytes_);
+    }
     bool exists = false;
     auto status = io::global_local_filesystem()->exists(spill_dir_, &exists);
     if (status.ok() && exists) {
+        if (_current_file_count) {
+            COUNTER_UPDATE(_current_file_count, -1);
+        }
         auto query_gc_dir = 
data_dir_->get_spill_data_gc_path(print_id(query_id_));
         status = io::global_local_filesystem()->create_directory(query_gc_dir);
         DBUG_EXECUTE_IF("fault_inject::spill_stream::gc", {
@@ -79,10 +94,19 @@ void SpillStream::gc() {
 }
 
 Status SpillStream::prepare() {
-    writer_ = std::make_unique<SpillWriter>(stream_id_, batch_rows_, 
data_dir_, spill_dir_);
+    writer_ =
+            std::make_unique<SpillWriter>(profile_, stream_id_, batch_rows_, 
data_dir_, spill_dir_);
 
     reader_ = std::make_unique<SpillReader>(stream_id_, 
writer_->get_file_path());
-    return Status::OK();
+
+    DBUG_EXECUTE_IF("fault_inject::spill_stream::prepare_spill", {
+        return Status::Error<INTERNAL_ERROR>("fault_inject spill_stream 
prepare_spill failed");
+    });
+    COUNTER_UPDATE(_total_file_count, 1);
+    if (_current_file_count) {
+        COUNTER_UPDATE(_current_file_count, 1);
+    }
+    return writer_->open();
 }
 
 const TUniqueId& SpillStream::query_id() const {
@@ -92,12 +116,6 @@ const TUniqueId& SpillStream::query_id() const {
 const std::string& SpillStream::get_spill_root_dir() const {
     return data_dir_->path();
 }
-Status SpillStream::prepare_spill() {
-    DBUG_EXECUTE_IF("fault_inject::spill_stream::prepare_spill", {
-        return Status::Error<INTERNAL_ERROR>("fault_inject spill_stream 
prepare_spill failed");
-    });
-    return writer_->open();
-}
 
 Status SpillStream::spill_block(RuntimeState* state, const Block& block, bool 
eof) {
     size_t written_bytes = 0;
@@ -106,9 +124,7 @@ Status SpillStream::spill_block(RuntimeState* state, const 
Block& block, bool eo
     });
     RETURN_IF_ERROR(writer_->write(state, block, written_bytes));
     if (eof) {
-        RETURN_IF_ERROR(writer_->close());
-        total_written_bytes_ = writer_->get_written_bytes();
-        writer_.reset();
+        RETURN_IF_ERROR(spill_eof());
     } else {
         total_written_bytes_ = writer_->get_written_bytes();
     }
diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h
index 26b7dcbaf06..5be151be72c 100644
--- a/be/src/vec/spill/spill_stream.h
+++ b/be/src/vec/spill/spill_stream.h
@@ -56,32 +56,17 @@ public:
 
     int64_t get_written_bytes() const { return total_written_bytes_; }
 
-    Status prepare_spill();
-
     Status spill_block(RuntimeState* state, const Block& block, bool eof);
 
     Status spill_eof();
 
     Status read_next_block_sync(Block* block, bool* eos);
 
-    void set_write_counters(RuntimeProfile::Counter* serialize_timer,
-                            RuntimeProfile::Counter* write_block_counter,
-                            RuntimeProfile::Counter* write_bytes_counter,
-                            RuntimeProfile::Counter* write_timer,
-                            RuntimeProfile::Counter* wait_io_timer,
-                            RuntimeProfile::Counter* memory_used_counter) {
-        writer_->set_counters(serialize_timer, write_block_counter, 
write_bytes_counter,
-                              write_timer, memory_used_counter);
-        write_wait_io_timer_ = wait_io_timer;
-    }
-
-    void set_read_counters(RuntimeProfile::Counter* read_timer,
-                           RuntimeProfile::Counter* deserialize_timer,
-                           RuntimeProfile::Counter* read_bytes,
-                           RuntimeProfile::Counter* wait_io_timer) {
-        reader_->set_counters(read_timer, deserialize_timer, read_bytes);
-        read_wait_io_timer_ = wait_io_timer;
-    }
+    void set_write_counters(RuntimeProfile* profile) { 
writer_->set_counters(profile); }
+
+    void set_read_counters(RuntimeProfile* profile) { 
reader_->set_counters(profile); }
+
+    void update_shared_profiles(RuntimeProfile* source_op_profile);
 
     const TUniqueId& query_id() const;
 
@@ -93,6 +78,8 @@ private:
     RuntimeState* state_ = nullptr;
     int64_t stream_id_;
     SpillDataDir* data_dir_ = nullptr;
+    // Directory path format specified in 
SpillStreamManager::register_spill_stream:
+    // 
storage_root/spill/query_id/partitioned_hash_join-node_id-task_id-stream_id
     std::string spill_dir_;
     size_t batch_rows_;
     size_t batch_bytes_;
@@ -106,8 +93,9 @@ private:
     TUniqueId query_id_;
 
     RuntimeProfile* profile_ = nullptr;
-    RuntimeProfile::Counter* write_wait_io_timer_ = nullptr;
-    RuntimeProfile::Counter* read_wait_io_timer_ = nullptr;
+    RuntimeProfile::Counter* _current_file_count = nullptr;
+    RuntimeProfile::Counter* _total_file_count = nullptr;
+    RuntimeProfile::Counter* _current_file_size = nullptr;
 };
 using SpillStreamSPtr = std::shared_ptr<SpillStream>;
 } // namespace vectorized
diff --git a/be/src/vec/spill/spill_stream_manager.cpp 
b/be/src/vec/spill/spill_stream_manager.cpp
index 61e96559d23..c4641866dcf 100644
--- a/be/src/vec/spill/spill_stream_manager.cpp
+++ b/be/src/vec/spill/spill_stream_manager.cpp
@@ -168,6 +168,7 @@ Status 
SpillStreamManager::register_spill_stream(RuntimeState* state, SpillStrea
     SpillDataDir* data_dir = nullptr;
     for (auto& dir : data_dirs) {
         std::string spill_root_dir = dir->get_spill_data_path();
+        // 
storage_root/spill/query_id/partitioned_hash_join-node_id-task_id-stream_id
         spill_dir = fmt::format("{}/{}/{}-{}-{}-{}", spill_root_dir, query_id, 
operator_name,
                                 node_id, state->task_id(), id);
         auto st = io::global_local_filesystem()->create_directory(spill_dir);
diff --git a/be/src/vec/spill/spill_writer.cpp 
b/be/src/vec/spill/spill_writer.cpp
index 9fbd81601b6..34715787756 100644
--- a/be/src/vec/spill/spill_writer.cpp
+++ b/be/src/vec/spill/spill_writer.cpp
@@ -45,12 +45,12 @@ Status SpillWriter::close() {
 
     // meta: block1 offset, block2 offset, ..., blockn offset, 
max_sub_block_size, n
     {
-        SCOPED_TIMER(write_timer_);
+        SCOPED_TIMER(_write_file_timer);
         RETURN_IF_ERROR(file_writer_->append(meta_));
     }
 
     total_written_bytes_ += meta_.size();
-    COUNTER_UPDATE(write_bytes_counter_, meta_.size());
+    COUNTER_UPDATE(_write_file_data_bytes_counter, meta_.size());
 
     data_dir_->update_spill_data_usage(meta_.size());
 
@@ -64,6 +64,8 @@ Status SpillWriter::write(RuntimeState* state, const Block& 
block, size_t& writt
     written_bytes = 0;
     DCHECK(file_writer_);
     auto rows = block.rows();
+    COUNTER_UPDATE(_write_rows_counter, rows);
+    COUNTER_UPDATE(_write_block_bytes_counter, block.bytes());
     // file format: block1, block2, ..., blockn, meta
     if (rows <= batch_size_) {
         return _write_internal(block, written_bytes);
@@ -84,9 +86,10 @@ Status SpillWriter::write(RuntimeState* state, const Block& 
block, size_t& writt
                 }
             });
 
-            auto tmp_blcok_mem = tmp_block.allocated_bytes();
-            memory_used_counter_->update(tmp_blcok_mem);
-            Defer defer {[&]() { memory_used_counter_->update(-tmp_blcok_mem); 
}};
+            int64_t tmp_blcok_mem = tmp_block.allocated_bytes();
+            COUNTER_UPDATE(_memory_used_counter, tmp_blcok_mem);
+            COUNTER_SET(_peak_memory_usage_counter, 
_memory_used_counter->value());
+            Defer defer {[&]() { COUNTER_UPDATE(_memory_used_counter, 
-tmp_blcok_mem); }};
             RETURN_IF_ERROR(_write_internal(tmp_block, written_bytes));
 
             row_idx += block_rows;
@@ -100,26 +103,31 @@ Status SpillWriter::_write_internal(const Block& block, 
size_t& written_bytes) {
 
     Status status;
     std::string buff;
+    int64_t buff_size {0};
 
     if (block.rows() > 0) {
         {
             PBlock pblock;
-            SCOPED_TIMER(serialize_timer_);
+            SCOPED_TIMER(_serialize_timer);
             status = block.serialize(
                     BeExecVersionManager::get_newest_version(), &pblock, 
&uncompressed_bytes,
                     &compressed_bytes,
                     segment_v2::CompressionTypePB::ZSTD); // ZSTD for better 
compression ratio
             RETURN_IF_ERROR(status);
-            auto pblock_mem = pblock.ByteSizeLong();
-            memory_used_counter_->update(pblock_mem);
-            Defer defer {[&]() { memory_used_counter_->update(-pblock_mem); }};
+            int64_t pblock_mem = pblock.ByteSizeLong();
+            COUNTER_UPDATE(_memory_used_counter, pblock_mem);
+            COUNTER_SET(_peak_memory_usage_counter, 
_memory_used_counter->value());
+            Defer defer {[&]() { COUNTER_UPDATE(_memory_used_counter, 
-pblock_mem); }};
             if (!pblock.SerializeToString(&buff)) {
                 return Status::Error<ErrorCode::SERIALIZE_PROTOBUF_ERROR>(
                         "serialize spill data error. [path={}]", file_path_);
             }
-            memory_used_counter_->update(buff.size());
+            buff_size = buff.size();
+            COUNTER_UPDATE(_memory_used_counter, buff_size);
+            COUNTER_SET(_peak_memory_usage_counter, 
_memory_used_counter->value());
+            Defer defer2 {[&]() { COUNTER_UPDATE(_memory_used_counter, 
-buff_size); }};
         }
-        if (data_dir_->reach_capacity_limit(buff.size())) {
+        if (data_dir_->reach_capacity_limit(buff_size)) {
             return Status::Error<ErrorCode::DISK_REACH_CAPACITY_LIMIT>(
                     "spill data total size exceed limit, path: {}, size limit: 
{}, spill data "
                     "size: {}",
@@ -129,31 +137,29 @@ Status SpillWriter::_write_internal(const Block& block, 
size_t& written_bytes) {
         }
 
         {
-            auto buff_size = buff.size();
             Defer defer {[&]() {
-                memory_used_counter_->update(-buff_size);
                 if (status.ok()) {
                     data_dir_->update_spill_data_usage(buff_size);
 
                     written_bytes += buff_size;
-                    max_sub_block_size_ = std::max(max_sub_block_size_, 
buff_size);
+                    max_sub_block_size_ = std::max(max_sub_block_size_, 
(size_t)buff_size);
 
                     meta_.append((const char*)&total_written_bytes_, 
sizeof(size_t));
-                    COUNTER_UPDATE(write_bytes_counter_, buff_size);
-                    COUNTER_UPDATE(write_block_counter_, 1);
+                    COUNTER_UPDATE(_write_file_data_bytes_counter, buff_size);
+                    COUNTER_UPDATE(_write_block_counter, 1);
                     total_written_bytes_ += buff_size;
                     ++written_blocks_;
                 }
             }};
             {
-                SCOPED_TIMER(write_timer_);
+                SCOPED_TIMER(_write_file_timer);
                 status = file_writer_->append(buff);
                 RETURN_IF_ERROR(status);
             }
         }
     }
 
-    return Status::OK();
+    return status;
 }
 
 } // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/spill/spill_writer.h b/be/src/vec/spill/spill_writer.h
index c3502b5d6a4..a6ea3200b14 100644
--- a/be/src/vec/spill/spill_writer.h
+++ b/be/src/vec/spill/spill_writer.h
@@ -31,9 +31,16 @@ namespace vectorized {
 class SpillDataDir;
 class SpillWriter {
 public:
-    SpillWriter(int64_t id, size_t batch_size, SpillDataDir* data_dir, const 
std::string& dir)
+    SpillWriter(RuntimeProfile* profile, int64_t id, size_t batch_size, 
SpillDataDir* data_dir,
+                const std::string& dir)
             : data_dir_(data_dir), stream_id_(id), batch_size_(batch_size) {
-        file_path_ = dir + "/" + std::to_string(file_index_);
+        // Directory path format specified in 
SpillStreamManager::register_spill_stream:
+        // 
storage_root/spill/query_id/partitioned_hash_join-node_id-task_id-stream_id/0
+        file_path_ = dir + "/0";
+
+        _memory_used_counter = profile->get_counter("MemoryUsage");
+        _peak_memory_usage_counter = 
static_cast<RuntimeProfile::HighWaterMarkCounter*>(
+                profile->get_counter("MemoryUsagePeak"));
     }
 
     Status open();
@@ -48,21 +55,16 @@ public:
 
     const std::string& get_file_path() const { return file_path_; }
 
-    void set_counters(RuntimeProfile::Counter* serialize_timer,
-                      RuntimeProfile::Counter* write_block_counter,
-                      RuntimeProfile::Counter* write_bytes_counter,
-                      RuntimeProfile::Counter* write_timer,
-                      RuntimeProfile::Counter* memory_used_counter) {
-        serialize_timer_ = serialize_timer;
-        write_block_counter_ = write_block_counter;
-        write_bytes_counter_ = write_bytes_counter;
-        write_timer_ = write_timer;
-        memory_used_counter_ = memory_used_counter;
+    void set_counters(RuntimeProfile* profile) {
+        _write_file_timer = profile->get_counter("SpillWriteFileTime");
+        _serialize_timer = 
profile->get_counter("SpillWriteSerializeBlockTime");
+        _write_block_counter = profile->get_counter("SpillWriteBlockCount");
+        _write_block_bytes_counter = 
profile->get_counter("SpillWriteBlockDataSize");
+        _write_file_data_bytes_counter = 
profile->get_counter("SpillWriteFileTotalSize");
+        _write_rows_counter = profile->get_counter("SpillWriteRows");
     }
 
 private:
-    void _init_profile();
-
     Status _write_internal(const Block& block, size_t& written_bytes);
 
     // not owned, point to the data dir of this rowset
@@ -72,7 +74,6 @@ private:
     int64_t stream_id_;
     size_t batch_size_;
     size_t max_sub_block_size_ = 0;
-    int file_index_ = 0;
     std::string file_path_;
     std::unique_ptr<doris::io::FileWriter> file_writer_;
 
@@ -80,11 +81,14 @@ private:
     int64_t total_written_bytes_ = 0;
     std::string meta_;
 
-    RuntimeProfile::Counter* write_bytes_counter_ = nullptr;
-    RuntimeProfile::Counter* serialize_timer_ = nullptr;
-    RuntimeProfile::Counter* write_timer_ = nullptr;
-    RuntimeProfile::Counter* write_block_counter_ = nullptr;
-    RuntimeProfile::Counter* memory_used_counter_ = nullptr;
+    RuntimeProfile::Counter* _write_file_timer = nullptr;
+    RuntimeProfile::Counter* _serialize_timer = nullptr;
+    RuntimeProfile::Counter* _write_block_counter = nullptr;
+    RuntimeProfile::Counter* _write_block_bytes_counter = nullptr;
+    RuntimeProfile::Counter* _write_file_data_bytes_counter = nullptr;
+    RuntimeProfile::Counter* _write_rows_counter = nullptr;
+    RuntimeProfile::Counter* _memory_used_counter = nullptr;
+    RuntimeProfile::HighWaterMarkCounter* _peak_memory_usage_counter = nullptr;
 };
 using SpillWriterUPtr = std::unique_ptr<SpillWriter>;
 } // namespace vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to