This is an automated email from the ASF dual-hosted git repository. jacktengg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new b6b785f99bd improve spill counters (#41724) b6b785f99bd is described below commit b6b785f99bde491e1a74e7dfcbc40854e581aa8c Author: TengJianPing <18241664+jackte...@users.noreply.github.com> AuthorDate: Mon Oct 14 11:28:14 2024 +0800 improve spill counters (#41724) --- be/src/pipeline/dependency.cpp | 22 +++ be/src/pipeline/dependency.h | 35 ++++ be/src/pipeline/exec/aggregation_sink_operator.cpp | 1 - .../distinct_streaming_aggregation_operator.cpp | 1 - be/src/pipeline/exec/operator.h | 214 +++++++++++++++++---- .../exec/partitioned_aggregation_sink_operator.cpp | 14 +- .../exec/partitioned_aggregation_sink_operator.h | 11 +- .../partitioned_aggregation_source_operator.cpp | 29 +-- .../exec/partitioned_hash_join_probe_operator.cpp | 93 +++++---- .../exec/partitioned_hash_join_probe_operator.h | 5 - .../exec/partitioned_hash_join_sink_operator.cpp | 125 ++++++++---- .../exec/partitioned_hash_join_sink_operator.h | 1 + be/src/pipeline/exec/spill_sort_sink_operator.cpp | 45 +++-- be/src/pipeline/exec/spill_sort_sink_operator.h | 1 + .../pipeline/exec/spill_sort_source_operator.cpp | 42 ++-- be/src/pipeline/exec/spill_sort_source_operator.h | 4 - be/src/pipeline/exec/spill_utils.h | 35 +++- .../exec/streaming_aggregation_operator.cpp | 1 - be/src/vec/core/block.h | 12 +- be/src/vec/spill/spill_reader.cpp | 16 +- be/src/vec/spill/spill_reader.h | 24 ++- be/src/vec/spill/spill_stream.cpp | 40 ++-- be/src/vec/spill/spill_stream.h | 32 +-- be/src/vec/spill/spill_stream_manager.cpp | 1 + be/src/vec/spill/spill_writer.cpp | 42 ++-- be/src/vec/spill/spill_writer.h | 44 +++-- 26 files changed, 603 insertions(+), 287 deletions(-) diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp index 315e76a2426..cdde0c6330c 100644 --- a/be/src/pipeline/dependency.cpp +++ b/be/src/pipeline/dependency.cpp @@ -327,6 +327,19 @@ void PartitionedAggSharedState::init_spill_params(size_t spill_partition_count_b } } +void PartitionedAggSharedState::update_spill_stream_profiles(RuntimeProfile* source_profile) { + for (auto& partition : spill_partitions) { + if (partition->spilling_stream_) { + partition->spilling_stream_->update_shared_profiles(source_profile); + } + for (auto& stream : partition->spill_streams_) { + if (stream) { + stream->update_shared_profiles(source_profile); + } + } + } +} + Status AggSpillPartition::get_spill_stream(RuntimeState* state, int node_id, RuntimeProfile* profile, vectorized::SpillStreamSPtr& spill_stream) { @@ -339,6 +352,7 @@ Status AggSpillPartition::get_spill_stream(RuntimeState* state, int node_id, std::numeric_limits<int32_t>::max(), std::numeric_limits<size_t>::max(), profile)); spill_streams_.emplace_back(spilling_stream_); spill_stream = spilling_stream_; + spill_stream->set_write_counters(profile); return Status::OK(); } void AggSpillPartition::close() { @@ -365,6 +379,14 @@ void PartitionedAggSharedState::close() { spill_partitions.clear(); } +void SpillSortSharedState::update_spill_stream_profiles(RuntimeProfile* source_profile) { + for (auto& stream : sorted_streams) { + if (stream) { + stream->update_shared_profiles(source_profile); + } + } +} + void SpillSortSharedState::close() { // need to use CAS instead of only `if (!is_closed)` statement, // to avoid concurrent entry of close() both pass the if statement diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index 3cf1cbff9fa..20251aed294 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -426,14 +426,36 @@ private: Status _destroy_agg_status(vectorized::AggregateDataPtr data); }; +struct BasicSpillSharedState { + virtual ~BasicSpillSharedState() = default; + + // These two counters are shared to spill source operators as the initial value + // of 'SpillWriteFileCurrentSize' and 'SpillWriteFileCurrentCount'. + // Total bytes of spill data written to disk file(after serialized) + RuntimeProfile::Counter* _spill_write_file_data_size = nullptr; + RuntimeProfile::Counter* _spill_file_total_count = nullptr; + + void setup_shared_profile(RuntimeProfile* sink_profile) { + _spill_file_total_count = + ADD_COUNTER_WITH_LEVEL(sink_profile, "SpillWriteFileTotalCount", TUnit::UNIT, 1); + _spill_write_file_data_size = + ADD_COUNTER_WITH_LEVEL(sink_profile, "SpillWriteFileTotalSize", TUnit::BYTES, 1); + } + + virtual void update_spill_stream_profiles(RuntimeProfile* source_profile) = 0; +}; + struct AggSpillPartition; struct PartitionedAggSharedState : public BasicSharedState, + public BasicSpillSharedState, public std::enable_shared_from_this<PartitionedAggSharedState> { ENABLE_FACTORY_CREATOR(PartitionedAggSharedState) PartitionedAggSharedState() = default; ~PartitionedAggSharedState() override = default; + void update_spill_stream_profiles(RuntimeProfile* source_profile) override; + void init_spill_params(size_t spill_partition_count_bits); void close(); @@ -498,6 +520,7 @@ public: }; struct SpillSortSharedState : public BasicSharedState, + public BasicSpillSharedState, public std::enable_shared_from_this<SpillSortSharedState> { ENABLE_FACTORY_CREATOR(SpillSortSharedState) @@ -515,6 +538,9 @@ struct SpillSortSharedState : public BasicSharedState, LOG(INFO) << "spill sort block batch row count: " << spill_block_batch_row_count; } } + + void update_spill_stream_profiles(RuntimeProfile* source_profile) override; + void close(); SortSharedState* in_mem_shared_state = nullptr; @@ -622,9 +648,18 @@ struct HashJoinSharedState : public JoinSharedState { struct PartitionedHashJoinSharedState : public HashJoinSharedState, + public BasicSpillSharedState, public std::enable_shared_from_this<PartitionedHashJoinSharedState> { ENABLE_FACTORY_CREATOR(PartitionedHashJoinSharedState) + void update_spill_stream_profiles(RuntimeProfile* source_profile) override { + for (auto& stream : spilled_streams) { + if (stream) { + stream->update_shared_profiles(source_profile); + } + } + } + std::unique_ptr<RuntimeState> inner_runtime_state; std::shared_ptr<HashJoinSharedState> inner_shared_state; std::vector<std::unique_ptr<vectorized::MutableBlock>> partitioned_build_blocks; diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 3dd43c3c4d8..79d1460cd06 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -66,7 +66,6 @@ Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { _build_timer = ADD_TIMER(Base::profile(), "BuildTime"); _serialize_key_timer = ADD_TIMER(Base::profile(), "SerializeKeyTime"); - _exec_timer = ADD_TIMER(Base::profile(), "ExecTime"); _merge_timer = ADD_TIMER(Base::profile(), "MergeTime"); _expr_timer = ADD_TIMER(Base::profile(), "ExprTime"); _serialize_data_timer = ADD_TIMER(Base::profile(), "SerializeDataTime"); diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index 79347064048..b0e635bef84 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -72,7 +72,6 @@ Status DistinctStreamingAggLocalState::init(RuntimeState* state, LocalStateInfo& SCOPED_TIMER(Base::exec_time_counter()); SCOPED_TIMER(Base::_init_timer); _build_timer = ADD_TIMER(Base::profile(), "BuildTime"); - _exec_timer = ADD_TIMER(Base::profile(), "ExecTime"); _hash_table_compute_timer = ADD_TIMER(Base::profile(), "HashTableComputeTime"); _hash_table_emplace_timer = ADD_TIMER(Base::profile(), "HashTableEmplaceTime"); _hash_table_input_counter = ADD_COUNTER(Base::profile(), "HashTableInputCount", TUnit::UNIT); diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 649bc70c238..da0cc008f06 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -287,28 +287,137 @@ public: Status init(RuntimeState* state, LocalStateInfo& info) override { RETURN_IF_ERROR(PipelineXLocalState<SharedStateArg>::init(state, info)); - _spill_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillTime", 1); - _spill_recover_time = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillRecoverTime", 1); - _spill_read_data_time = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadDataTime", 1); - _spill_deserialize_time = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillDeserializeTime", 1); - _spill_read_bytes = - ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadDataSize", TUnit::BYTES, 1); - _spill_wait_in_queue_timer = - ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWaitInQueueTime", 1); - _spill_write_wait_io_timer = - ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteWaitIOTime", 1); - _spill_read_wait_io_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadWaitIOTime", 1); + + _spill_total_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillTotalTime", 1); + init_spill_read_counters(); + return Status::OK(); } - RuntimeProfile::Counter* _spill_timer = nullptr; - RuntimeProfile::Counter* _spill_recover_time; - RuntimeProfile::Counter* _spill_read_data_time; - RuntimeProfile::Counter* _spill_deserialize_time; - RuntimeProfile::Counter* _spill_read_bytes; - RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr; - RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr; - RuntimeProfile::Counter* _spill_wait_in_queue_timer = nullptr; + void init_spill_write_counters() { + _spill_write_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteTime", 1); + + _spill_write_wait_in_queue_task_count = ADD_COUNTER_WITH_LEVEL( + Base::profile(), "SpillWriteTaskWaitInQueueCount", TUnit::UNIT, 1); + _spill_writing_task_count = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteTaskCount", TUnit::UNIT, 1); + _spill_write_wait_in_queue_timer = + ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteTaskWaitInQueueTime", 1); + + _spill_write_file_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteFileTime", 1); + + _spill_write_serialize_block_timer = + ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteSerializeBlockTime", 1); + _spill_write_block_count = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockCount", TUnit::UNIT, 1); + _spill_write_block_data_size = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockDataSize", TUnit::BYTES, 1); + _spill_write_file_data_size = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteFileTotalSize", TUnit::BYTES, 1); + _spill_write_rows_count = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteRows", TUnit::UNIT, 1); + _spill_file_total_count = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteFileTotalCount", TUnit::UNIT, 1); + } + + void init_spill_read_counters() { + // Spill read counters + _spill_recover_time = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillRecoverTime", 1); + + _spill_read_wait_in_queue_task_count = ADD_COUNTER_WITH_LEVEL( + Base::profile(), "SpillReadTaskWaitInQueueCount", TUnit::UNIT, 1); + _spill_reading_task_count = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadTaskCount", TUnit::UNIT, 1); + _spill_read_wait_in_queue_timer = + ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadTaskWaitInQueueTime", 1); + + _spill_read_file_time = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadFileTime", 1); + _spill_read_derialize_block_timer = + ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadDerializeBlockTime", 1); + + _spill_read_block_count = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadBlockCount", TUnit::UNIT, 1); + _spill_read_block_data_size = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadBlockDataSize", TUnit::BYTES, 1); + _spill_read_file_size = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadFileSize", TUnit::BYTES, 1); + _spill_read_rows_count = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadRows", TUnit::UNIT, 1); + _spill_read_file_count = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadFileCount", TUnit::UNIT, 1); + + _spill_file_current_size = ADD_COUNTER_WITH_LEVEL( + Base::profile(), "SpillWriteFileCurrentSize", TUnit::BYTES, 1); + _spill_file_current_count = ADD_COUNTER_WITH_LEVEL( + Base::profile(), "SpillWriteFileCurrentCount", TUnit::UNIT, 1); + } + + // These two counters are shared to spill source operators as the initial value + // Initialize values of counters 'SpillWriteFileCurrentSize' and 'SpillWriteFileCurrentCount' + // from spill sink operators' "SpillWriteFileTotalCount" and "SpillWriteFileTotalSize" + void copy_shared_spill_profile() { + if (_copy_shared_spill_profile) { + _copy_shared_spill_profile = false; + const auto* spill_shared_state = (const BasicSpillSharedState*)Base::_shared_state; + COUNTER_SET(_spill_file_current_size, + spill_shared_state->_spill_write_file_data_size->value()); + COUNTER_SET(_spill_file_current_count, + spill_shared_state->_spill_file_total_count->value()); + Base::_shared_state->update_spill_stream_profiles(Base::profile()); + } + } + + // Total time of spill, including spill task scheduling time, + // serialize block time, write disk file time, + // and read disk file time, deserialize block time etc. + RuntimeProfile::Counter* _spill_total_timer = nullptr; + + // Spill write counters + // Total time of spill write, including serialize block time, write disk file, + // and wait in queue time, etc. + RuntimeProfile::Counter* _spill_write_timer = nullptr; + + RuntimeProfile::Counter* _spill_write_wait_in_queue_task_count = nullptr; + RuntimeProfile::Counter* _spill_writing_task_count = nullptr; + RuntimeProfile::Counter* _spill_write_wait_in_queue_timer = nullptr; + + // Total time of writing file + RuntimeProfile::Counter* _spill_write_file_timer = nullptr; + RuntimeProfile::Counter* _spill_write_serialize_block_timer = nullptr; + // Original count of spilled Blocks + // One Big Block maybe split into multiple small Blocks when actually written to disk file. + RuntimeProfile::Counter* _spill_write_block_count = nullptr; + // Total bytes of spill data in Block format(in memory format) + RuntimeProfile::Counter* _spill_write_block_data_size = nullptr; + // Total bytes of spill data written to disk file(after serialized) + RuntimeProfile::Counter* _spill_write_file_data_size = nullptr; + RuntimeProfile::Counter* _spill_write_rows_count = nullptr; + RuntimeProfile::Counter* _spill_file_total_count = nullptr; + RuntimeProfile::Counter* _spill_file_current_count = nullptr; + // Spilled file total size + RuntimeProfile::Counter* _spill_file_total_size = nullptr; + // Current spilled file size + RuntimeProfile::Counter* _spill_file_current_size = nullptr; + + // Spill read counters + // Total time of recovring spilled data, including read file time, deserialize time, etc. + RuntimeProfile::Counter* _spill_recover_time = nullptr; + + RuntimeProfile::Counter* _spill_read_wait_in_queue_task_count = nullptr; + RuntimeProfile::Counter* _spill_reading_task_count = nullptr; + RuntimeProfile::Counter* _spill_read_wait_in_queue_timer = nullptr; + + RuntimeProfile::Counter* _spill_read_file_time = nullptr; + RuntimeProfile::Counter* _spill_read_derialize_block_timer = nullptr; + RuntimeProfile::Counter* _spill_read_block_count = nullptr; + // Total bytes of read data in Block format(in memory format) + RuntimeProfile::Counter* _spill_read_block_data_size = nullptr; + // Total bytes of spill data read from disk file + RuntimeProfile::Counter* _spill_read_file_size = nullptr; + RuntimeProfile::Counter* _spill_read_rows_count = nullptr; + RuntimeProfile::Counter* _spill_read_file_count = nullptr; + + bool _copy_shared_spill_profile = true; }; class DataSinkOperatorXBase; @@ -589,19 +698,28 @@ public: Status init(RuntimeState* state, LocalSinkStateInfo& info) override { RETURN_IF_ERROR(Base::init(state, info)); - _spill_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillTime", 1); - _spill_serialize_block_timer = - ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillSerializeBlockTime", 1); - _spill_write_disk_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteDiskTime", 1); - _spill_data_size = - ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteDataSize", TUnit::BYTES, 1); - _spill_block_count = + _spill_total_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillTotalTime", 1); + + _spill_write_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteTime", 1); + + _spill_write_wait_in_queue_task_count = ADD_COUNTER_WITH_LEVEL( + Base::profile(), "SpillWriteTaskWaitInQueueCount", TUnit::UNIT, 1); + _spill_writing_task_count = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteTaskCount", TUnit::UNIT, 1); + _spill_write_wait_in_queue_timer = + ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteTaskWaitInQueueTime", 1); + + _spill_write_file_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteFileTime", 1); + + _spill_write_serialize_block_timer = + ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteSerializeBlockTime", 1); + _spill_write_block_count = ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockCount", TUnit::UNIT, 1); - _spill_wait_in_queue_timer = - ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWaitInQueueTime", 1); - _spill_write_wait_io_timer = - ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteWaitIOTime", 1); - _spill_read_wait_io_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadWaitIOTime", 1); + _spill_write_block_data_size = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockDataSize", TUnit::BYTES, 1); + _spill_write_rows_count = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteRows", TUnit::UNIT, 1); + _spill_max_rows_of_partition = ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillMaxRowsOfPartition", TUnit::UNIT, 1); _spill_min_rows_of_partition = @@ -633,14 +751,32 @@ public: std::vector<int64_t> _rows_in_partitions; - RuntimeProfile::Counter* _spill_timer = nullptr; - RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr; - RuntimeProfile::Counter* _spill_write_disk_timer = nullptr; - RuntimeProfile::Counter* _spill_data_size = nullptr; - RuntimeProfile::Counter* _spill_block_count = nullptr; - RuntimeProfile::Counter* _spill_wait_in_queue_timer = nullptr; - RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr; - RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr; + // Total time of spill, including spill task scheduling time, + // serialize block time, write disk file time, + // and read disk file time, deserialize block time etc. + RuntimeProfile::Counter* _spill_total_timer = nullptr; + + // Spill write counters + // Total time of spill write, including serialize block time, write disk file, + // and wait in queue time, etc. + RuntimeProfile::Counter* _spill_write_timer = nullptr; + + RuntimeProfile::Counter* _spill_write_wait_in_queue_task_count = nullptr; + RuntimeProfile::Counter* _spill_writing_task_count = nullptr; + RuntimeProfile::Counter* _spill_write_wait_in_queue_timer = nullptr; + + // Total time of writing file + RuntimeProfile::Counter* _spill_write_file_timer = nullptr; + RuntimeProfile::Counter* _spill_write_serialize_block_timer = nullptr; + // Original count of spilled Blocks + // One Big Block maybe split into multiple small Blocks when actually written to disk file. + RuntimeProfile::Counter* _spill_write_block_count = nullptr; + // Total bytes of spill data in Block format(in memory format) + RuntimeProfile::Counter* _spill_write_block_data_size = nullptr; + RuntimeProfile::Counter* _spill_write_rows_count = nullptr; + // Spilled file total size + RuntimeProfile::Counter* _spill_file_total_size = nullptr; + RuntimeProfile::Counter* _spill_max_rows_of_partition = nullptr; RuntimeProfile::Counter* _spill_min_rows_of_partition = nullptr; }; diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 1a86fdb2a9d..f5c09459f85 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -76,6 +76,7 @@ Status PartitionedAggSinkLocalState::init(doris::RuntimeState* state, Status PartitionedAggSinkLocalState::open(RuntimeState* state) { SCOPED_TIMER(Base::exec_time_counter()); SCOPED_TIMER(Base::_open_timer); + _shared_state->setup_shared_profile(_profile); return Base::open(state); } @@ -301,8 +302,17 @@ Status PartitionedAggSinkLocalState::revoke_memory( state->get_query_ctx()->increase_revoking_tasks_count(); auto spill_runnable = std::make_shared<SpillRunnable>( - state, _shared_state->shared_from_this(), + state, _profile, true, _shared_state->shared_from_this(), [this, &parent, state, query_id, size_to_revoke, spill_context, submit_timer] { + auto submit_elapsed_time = submit_timer.elapsed_time(); + _spill_write_wait_in_queue_timer->update(submit_elapsed_time); + exec_time_counter()->update(submit_elapsed_time); + _spill_total_timer->update(submit_elapsed_time); + + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_spill_total_timer); + SCOPED_TIMER(_spill_write_timer); + DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_cancel", { auto st = Status::InternalError( "fault_inject partitioned_agg_sink " @@ -310,8 +320,6 @@ Status PartitionedAggSinkLocalState::revoke_memory( ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, st); return st; }); - _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); - SCOPED_TIMER(Base::_spill_timer); Defer defer {[&]() { if (!_shared_state->sink_status.ok() || state->is_cancelled()) { if (!_shared_state->sink_status.ok()) { diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 9b70da54943..fa32e032303 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -150,10 +150,6 @@ public: auto status = spill_partition->get_spill_stream(state, Base::_parent->node_id(), Base::profile(), spill_stream); RETURN_IF_ERROR(status); - spill_stream->set_write_counters(Base::_spill_serialize_block_timer, - Base::_spill_block_count, Base::_spill_data_size, - Base::_spill_write_disk_timer, - Base::_spill_write_wait_io_timer, memory_used_counter()); status = to_block(context, keys, values, null_key_data); RETURN_IF_ERROR(status); @@ -168,14 +164,9 @@ public: keys.clear(); values.clear(); } - status = spill_stream->prepare_spill(); + status = spill_stream->spill_block(state, block_, false); RETURN_IF_ERROR(status); - { - SCOPED_TIMER(_spill_write_disk_timer); - status = spill_stream->spill_block(state, block_, false); - } - RETURN_IF_ERROR(status); status = spill_partition->flush_if_full(); _reset_tmp_data(); return status; diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index c6a6c09f01b..1b49c0d3768 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -137,6 +137,7 @@ Status PartitionedAggSourceOperatorX::close(RuntimeState* state) { Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { auto& local_state = get_local_state(state); + local_state.copy_shared_spill_profile(); Defer defer {[&]() { if (!local_state._status.ok() || *eos) { local_state._shared_state->close(); @@ -226,8 +227,6 @@ Status PartitionedAggLocalState::setup_in_memory_agg_op(RuntimeState* state) { Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, bool& has_data) { const auto query_id = state->query_id(); - MonotonicStopWatch submit_timer; - submit_timer.start(); if (_shared_state->spill_partitions.empty()) { _shared_state->close(); @@ -236,10 +235,7 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b } has_data = true; - auto spill_func = [this, state, query_id, submit_timer] { - _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); - MonotonicStopWatch execution_timer; - execution_timer.start(); + auto spill_func = [this, state, query_id] { Defer defer {[&]() { if (!_status.ok() || state->is_cancelled()) { if (!_status.ok()) { @@ -255,14 +251,11 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b while (!state->is_cancelled() && !has_agg_data && !_shared_state->spill_partitions.empty()) { for (auto& stream : _shared_state->spill_partitions[0]->spill_streams_) { - stream->set_read_counters(Base::_spill_read_data_time, - Base::_spill_deserialize_time, Base::_spill_read_bytes, - Base::_spill_read_wait_io_timer); + stream->set_read_counters(profile()); vectorized::Block block; bool eos = false; while (!eos && !state->is_cancelled()) { { - SCOPED_TIMER(Base::_spill_recover_time); DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::recover_spill_data", { _status = Status::Error<INTERNAL_ERROR>( @@ -298,7 +291,18 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b return _status; }; - auto exception_catch_func = [spill_func, query_id, this]() { + MonotonicStopWatch submit_timer; + submit_timer.start(); + auto exception_catch_func = [spill_func, query_id, submit_timer, this]() { + auto submit_elapsed_time = submit_timer.elapsed_time(); + _spill_read_wait_in_queue_timer->update(submit_elapsed_time); + exec_time_counter()->update(submit_elapsed_time); + _spill_total_timer->update(submit_elapsed_time); + + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_spill_total_timer); + SCOPED_TIMER(_spill_recover_time); + DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::merge_spill_data_cancel", { auto st = Status::InternalError( "fault_inject partitioned_agg_source " @@ -320,7 +324,8 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b }); _spill_dependency->block(); return ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit( - std::make_shared<SpillRunnable>(state, _shared_state->shared_from_this(), + std::make_shared<SpillRunnable>(state, _runtime_profile.get(), false, + _shared_state->shared_from_this(), exception_catch_func)); } } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 7b60c9a3e2f..6df2a7ca4ab 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -42,6 +42,8 @@ PartitionedHashJoinProbeLocalState::PartitionedHashJoinProbeLocalState(RuntimeSt Status PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(PipelineXSpillLocalState::init(state, info)); + init_spill_write_counters(); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_init_timer); _internal_runtime_profile.reset(new RuntimeProfile("internal_profile")); @@ -75,14 +77,6 @@ Status PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI _probe_blocks_bytes = ADD_COUNTER_WITH_LEVEL(profile(), "ProbeBlocksBytes", TUnit::BYTES, 1); - _spill_serialize_block_timer = - ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillSerializeBlockTime", 1); - _spill_write_disk_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteDiskTime", 1); - _spill_data_size = - ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteDataSize", TUnit::BYTES, 1); - _spill_block_count = - ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockCount", TUnit::UNIT, 1); - // Build phase _build_phase_label = ADD_LABEL_COUNTER(profile(), "BuildPhase"); _build_rows_counter = ADD_CHILD_COUNTER(profile(), "BuildRows", TUnit::UNIT, "BuildPhase"); @@ -182,11 +176,7 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks( auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); auto query_id = state->query_id(); - MonotonicStopWatch submit_timer; - submit_timer.start(); - - auto spill_func = [query_id, state, submit_timer, this] { - _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); + auto spill_func = [query_id, state, this] { SCOPED_TIMER(_spill_probe_timer); size_t not_revoked_size = 0; @@ -215,10 +205,7 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks( state, spilling_stream, print_id(state->query_id()), "hash_probe", _parent->node_id(), std::numeric_limits<int32_t>::max(), std::numeric_limits<size_t>::max(), _runtime_profile.get())); - RETURN_IF_ERROR(spilling_stream->prepare_spill()); - spilling_stream->set_write_counters( - _spill_serialize_block_timer, _spill_block_count, _spill_data_size, - _spill_write_disk_timer, _spill_write_wait_io_timer, memory_used_counter()); + spilling_stream->set_write_counters(_runtime_profile.get()); } auto merged_block = vectorized::MutableBlock::create_unique(blocks[0].clone_empty()); @@ -256,8 +243,19 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks( return Status::OK(); }; - auto exception_catch_func = [query_id, spill_func, spill_context, this]() { - SCOPED_TIMER(_spill_timer); + MonotonicStopWatch submit_timer; + submit_timer.start(); + + auto exception_catch_func = [query_id, spill_func, spill_context, submit_timer, this]() { + auto submit_elapsed_time = submit_timer.elapsed_time(); + _spill_write_wait_in_queue_timer->update(submit_elapsed_time); + exec_time_counter()->update(submit_elapsed_time); + _spill_total_timer->update(submit_elapsed_time); + + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_spill_total_timer); + SCOPED_TIMER(_spill_write_timer); + DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_cancel", { ExecEnv::GetInstance()->fragment_mgr()->cancel_query( query_id, Status::InternalError("fault_inject partitioned_hash_join_probe " @@ -286,7 +284,8 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks( "fault_inject partitioned_hash_join_probe spill_probe_blocks submit_func failed"); }); - auto spill_runnable = std::make_shared<SpillRunnable>(state, _shared_state->shared_from_this(), + auto spill_runnable = std::make_shared<SpillRunnable>(state, _runtime_profile.get(), true, + _shared_state->shared_from_this(), exception_catch_func); return spill_io_pool->submit(std::move(spill_runnable)); } @@ -296,8 +295,7 @@ Status PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in if (probe_spilling_stream) { RETURN_IF_ERROR(probe_spilling_stream->spill_eof()); - probe_spilling_stream->set_read_counters(_spill_read_data_time, _spill_deserialize_time, - _spill_read_bytes, _spill_read_wait_io_timer); + probe_spilling_stream->set_read_counters(profile()); } return Status::OK(); @@ -314,20 +312,15 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim if (!spilled_stream) { return Status::OK(); } - - spilled_stream->set_read_counters(_spill_read_data_time, _spill_deserialize_time, - _spill_read_bytes, _spill_read_wait_io_timer); + spilled_stream->set_read_counters(profile()); std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder = _shared_state->shared_from_this(); auto query_id = state->query_id(); - MonotonicStopWatch submit_timer; - submit_timer.start(); - auto read_func = [this, query_id, state, spilled_stream = spilled_stream, shared_state_holder, - submit_timer, partition_index] { + partition_index] { auto shared_state_sptr = shared_state_holder.lock(); if (!shared_state_sptr || state->is_cancelled()) { LOG(INFO) << "query: " << print_id(query_id) @@ -335,7 +328,6 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim return; } - _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); SCOPED_TIMER(_recovery_build_timer); bool eos = false; @@ -396,7 +388,19 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim } }; - auto exception_catch_func = [read_func, query_id, this]() { + MonotonicStopWatch submit_timer; + submit_timer.start(); + + auto exception_catch_func = [read_func, query_id, submit_timer, this]() { + auto submit_elapsed_time = submit_timer.elapsed_time(); + _spill_read_wait_in_queue_timer->update(submit_elapsed_time); + exec_time_counter()->update(submit_elapsed_time); + _spill_total_timer->update(submit_elapsed_time); + + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_spill_total_timer); + SCOPED_TIMER(_spill_recover_time); + DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_build_blocks_cancel", { ExecEnv::GetInstance()->fragment_mgr()->cancel_query( query_id, Status::InternalError("fault_inject partitioned_hash_join_probe " @@ -436,7 +440,8 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim "fault_inject partitioned_hash_join_probe " "recovery_build_blocks submit_func failed"); }); - auto spill_runnable = std::make_shared<SpillRunnable>(state, _shared_state->shared_from_this(), + auto spill_runnable = std::make_shared<SpillRunnable>(state, _runtime_profile.get(), false, + _shared_state->shared_from_this(), exception_catch_func); VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << _parent->node_id() << ", task id: " << state->task_id() << ", partition: " << partition_index @@ -475,14 +480,12 @@ Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim return Status::OK(); } + spilled_stream->set_read_counters(profile()); auto& blocks = _probe_blocks[partition_index]; auto query_id = state->query_id(); - MonotonicStopWatch submit_timer; - submit_timer.start(); - auto read_func = [this, query_id, &spilled_stream, &blocks, submit_timer] { - _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); + auto read_func = [this, query_id, &spilled_stream, &blocks] { SCOPED_TIMER(_recovery_probe_timer); vectorized::Block block; @@ -518,7 +521,18 @@ Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim } }; - auto exception_catch_func = [read_func, query_id, this]() { + MonotonicStopWatch submit_timer; + submit_timer.start(); + auto exception_catch_func = [read_func, query_id, submit_timer, this]() { + auto submit_elapsed_time = submit_timer.elapsed_time(); + _spill_read_wait_in_queue_timer->update(submit_elapsed_time); + exec_time_counter()->update(submit_elapsed_time); + _spill_total_timer->update(submit_elapsed_time); + + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_spill_total_timer); + SCOPED_TIMER(_spill_recover_time); + DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_probe_blocks_cancel", { ExecEnv::GetInstance()->fragment_mgr()->cancel_query( query_id, Status::InternalError("fault_inject partitioned_hash_join_probe " @@ -549,7 +563,8 @@ Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim "recovery_probe_blocks submit_func failed"); }); return spill_io_pool->submit(std::make_shared<SpillRunnable>( - state, _shared_state->shared_from_this(), exception_catch_func)); + state, _runtime_profile.get(), false, _shared_state->shared_from_this(), + exception_catch_func)); } PartitionedHashJoinProbeOperatorX::PartitionedHashJoinProbeOperatorX(ObjectPool* pool, @@ -957,6 +972,7 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori bool* eos) { *eos = false; auto& local_state = get_local_state(state); + local_state.copy_shared_spill_profile(); const auto need_to_spill = local_state._shared_state->need_to_spill; #ifndef NDEBUG Defer eos_check_defer([&] { @@ -1017,6 +1033,7 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori } local_state.add_num_rows_returned(block->rows()); + COUNTER_UPDATE(local_state._blocks_returned_counter, 1); if (*eos) { _update_profile_from_internal_states(local_state); } diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index e66b730685b..6ecbdd01e49 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -109,11 +109,6 @@ private: RuntimeProfile::Counter* _recovery_probe_blocks = nullptr; RuntimeProfile::Counter* _recovery_probe_timer = nullptr; - RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr; - RuntimeProfile::Counter* _spill_write_disk_timer = nullptr; - RuntimeProfile::Counter* _spill_data_size = nullptr; - RuntimeProfile::Counter* _spill_block_count = nullptr; - RuntimeProfile::Counter* _build_phase_label = nullptr; RuntimeProfile::Counter* _build_rows_counter = nullptr; RuntimeProfile::Counter* _publish_runtime_filter_timer = nullptr; diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index f7d38fe9d5d..8c9990da1ef 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -64,6 +64,7 @@ Status PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state, Status PartitionedHashJoinSinkLocalState::open(RuntimeState* state) { SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); + _shared_state->setup_shared_profile(_profile); RETURN_IF_ERROR(PipelineXSpillSinkLocalState::open(state)); auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>(); for (uint32_t i = 0; i != p._partition_count; ++i) { @@ -72,10 +73,7 @@ Status PartitionedHashJoinSinkLocalState::open(RuntimeState* state) { state, spilling_stream, print_id(state->query_id()), fmt::format("hash_build_sink_{}", i), _parent->node_id(), std::numeric_limits<int32_t>::max(), std::numeric_limits<size_t>::max(), _profile)); - RETURN_IF_ERROR(spilling_stream->prepare_spill()); - spilling_stream->set_write_counters(_spill_serialize_block_timer, _spill_block_count, - _spill_data_size, _spill_write_disk_timer, - _spill_write_wait_io_timer, memory_used_counter()); + spilling_stream->set_write_counters(_profile); } return p._partitioner->clone(state, _partitioner); } @@ -117,6 +115,32 @@ size_t PartitionedHashJoinSinkLocalState::revocable_mem_size(RuntimeState* state return mem_size; } +void PartitionedHashJoinSinkLocalState::update_memory_usage() { + if (!_shared_state->need_to_spill) { + if (_shared_state->inner_shared_state) { + auto* inner_sink_state_ = _shared_state->inner_runtime_state->get_sink_local_state(); + if (inner_sink_state_) { + auto* inner_sink_state = + assert_cast<HashJoinBuildSinkLocalState*>(inner_sink_state_); + COUNTER_SET(_memory_used_counter, inner_sink_state->_memory_used_counter->value()); + COUNTER_SET(_peak_memory_usage_counter, + inner_sink_state->_memory_used_counter->value()); + } + } + return; + } + + int64_t mem_size = 0; + auto& partitioned_blocks = _shared_state->partitioned_build_blocks; + for (auto& block : partitioned_blocks) { + if (block) { + mem_size += block->allocated_bytes(); + } + } + COUNTER_SET(_memory_used_counter, mem_size); + COUNTER_SET(_peak_memory_usage_counter, mem_size); +} + size_t PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState* state, bool eos) { size_t size_to_reserve = 0; auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>(); @@ -136,14 +160,21 @@ size_t PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState* sta Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block( RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) { auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>(); + HashJoinBuildSinkLocalState* inner_sink_state {nullptr}; + if (auto* tmp_sink_state = _shared_state->inner_runtime_state->get_sink_local_state()) { + inner_sink_state = assert_cast<HashJoinBuildSinkLocalState*>(tmp_sink_state); + } _shared_state->inner_shared_state->hash_table_variants.reset(); + if (inner_sink_state) { + COUNTER_UPDATE(_memory_used_counter, + -(inner_sink_state->_hash_table_memory_usage->value() + + inner_sink_state->_build_arena_memory_usage->value())); + } auto row_desc = p._child->row_desc(); const auto num_slots = row_desc.num_slots(); vectorized::Block build_block; - size_t block_old_mem = 0; - auto* inner_sink_state_ = _shared_state->inner_runtime_state->get_sink_local_state(); - if (inner_sink_state_) { - auto* inner_sink_state = assert_cast<HashJoinBuildSinkLocalState*>(inner_sink_state_); + int64_t block_old_mem = 0; + if (inner_sink_state) { build_block = inner_sink_state->_build_side_mutable_block.to_block(); block_old_mem = build_block.allocated_bytes(); } @@ -158,12 +189,12 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block( } if (build_block.columns() > num_slots) { - build_block.erase(num_slots); - memory_used_counter()->update(build_block.allocated_bytes() - block_old_mem); + vectorized::Block::erase_useless_column(&build_block, num_slots); + COUNTER_UPDATE(_memory_used_counter, build_block.allocated_bytes() - block_old_mem); } auto spill_func = [build_block = std::move(build_block), state, this]() mutable { - Defer defer {[&]() { memory_used_counter()->set((int64_t)revocable_mem_size(state)); }}; + Defer defer1 {[&]() { update_memory_usage(); }}; auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>(); auto& partitioned_blocks = _shared_state->partitioned_build_blocks; std::vector<std::vector<uint32_t>> partitions_indexes(p._partition_count); @@ -175,7 +206,6 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block( auto flush_rows = [&state, this](std::unique_ptr<vectorized::MutableBlock>& partition_block, vectorized::SpillStreamSPtr& spilling_stream) { auto block = partition_block->to_block(); - Defer defer {[&]() { memory_used_counter()->update(-block.allocated_bytes()); }}; auto status = spilling_stream->spill_block(state, block, false); if (!status.ok()) { @@ -198,9 +228,11 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block( sub_block.get_by_position(i).column = build_block.get_by_position(i).column->cut(offset, this_run); } - auto sub_blocks_memory_usage = sub_block.allocated_bytes(); - memory_used_counter()->update(sub_blocks_memory_usage); - Defer defer {[&]() { memory_used_counter()->update(-sub_blocks_memory_usage); }}; + int64_t sub_blocks_memory_usage = sub_block.allocated_bytes(); + COUNTER_UPDATE(_memory_used_counter, sub_blocks_memory_usage); + COUNTER_SET(_peak_memory_usage_counter, _memory_used_counter->value()); + Defer defer2 { + [&]() { COUNTER_UPDATE(_memory_used_counter, -sub_blocks_memory_usage); }}; offset += this_run; const auto is_last_block = offset == total_rows; @@ -225,12 +257,11 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block( partition_block = vectorized::MutableBlock::create_unique(build_block.clone_empty()); } - auto old_mem = partition_block->allocated_bytes(); + int64_t old_mem = partition_block->allocated_bytes(); { SCOPED_TIMER(_partition_shuffle_timer); Status st = partition_block->add_rows(&sub_block, begin, end); - memory_used_counter()->update(partition_block->allocated_bytes() - old_mem); if (!st.ok()) { std::unique_lock<std::mutex> lock(_spill_lock); _spill_status = st; @@ -240,6 +271,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block( } partitions_indexes[partition_idx].clear(); } + int64_t new_mem = partition_block->allocated_bytes(); if (partition_block->rows() >= reserved_size || is_last_block) { if (!flush_rows(partition_block, spilling_stream)) { @@ -247,7 +279,10 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block( } partition_block = vectorized::MutableBlock::create_unique(build_block.clone_empty()); - memory_used_counter()->update(partition_block->allocated_bytes()); + COUNTER_UPDATE(_memory_used_counter, -new_mem); + } else { + COUNTER_UPDATE(_memory_used_counter, new_mem - old_mem); + COUNTER_SET(_peak_memory_usage_counter, _memory_used_counter->value()); } } } @@ -255,8 +290,18 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block( _spill_dependency->set_ready(); }; - auto exception_catch_func = [spill_func, spill_context, this]() mutable { - SCOPED_TIMER(_spill_timer); + MonotonicStopWatch submit_timer; + submit_timer.start(); + auto exception_catch_func = [spill_func, spill_context, submit_timer, this]() mutable { + auto submit_elapsed_time = submit_timer.elapsed_time(); + _spill_write_wait_in_queue_timer->update(submit_elapsed_time); + exec_time_counter()->update(submit_elapsed_time); + _spill_total_timer->update(submit_elapsed_time); + + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_spill_total_timer); + SCOPED_TIMER(_spill_write_timer); + auto status = [&]() { RETURN_IF_CATCH_EXCEPTION(spill_func()); return Status::OK(); @@ -274,8 +319,8 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block( } }; - auto spill_runnable = std::make_shared<SpillRunnable>(state, _shared_state->shared_from_this(), - exception_catch_func); + auto spill_runnable = std::make_shared<SpillRunnable>( + state, _profile, true, _shared_state->shared_from_this(), exception_catch_func); auto* thread_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); @@ -291,6 +336,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block( Status PartitionedHashJoinSinkLocalState::revoke_memory( RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) { + SCOPED_TIMER(_spill_total_timer); VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", task: " << state->task_id() << " hash join sink " << _parent->node_id() << " revoke_memory" << ", eos: " << _child_eos; @@ -307,6 +353,9 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory( auto query_id = state->query_id(); + auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); + DCHECK(spill_io_pool != nullptr); + for (size_t i = 0; i != _shared_state->partitioned_build_blocks.size(); ++i) { vectorized::SpillStreamSPtr& spilling_stream = _shared_state->spilled_streams[i]; auto& mutable_block = _shared_state->partitioned_build_blocks[i]; @@ -319,10 +368,6 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory( DCHECK(spilling_stream != nullptr); - auto* spill_io_pool = - ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); - DCHECK(spill_io_pool != nullptr); - MonotonicStopWatch submit_timer; submit_timer.start(); @@ -335,8 +380,17 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory( // so that when a stream finished, it should desc -1 state->get_query_ctx()->increase_revoking_tasks_count(); auto spill_runnable = std::make_shared<SpillRunnable>( - state, _shared_state->shared_from_this(), + state, _profile, true, _shared_state->shared_from_this(), [this, query_id, spilling_stream, i, submit_timer, spill_context] { + auto submit_elapsed_time = submit_timer.elapsed_time(); + _spill_write_wait_in_queue_timer->update(submit_elapsed_time); + exec_time_counter()->update(submit_elapsed_time); + _spill_total_timer->update(submit_elapsed_time); + + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_spill_total_timer); + SCOPED_TIMER(_spill_write_timer); + DBUG_EXECUTE_IF( "fault_inject::partitioned_hash_join_sink::revoke_memory_cancel", { ExecEnv::GetInstance()->fragment_mgr()->cancel_query( @@ -345,7 +399,6 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory( "revoke_memory canceled")); return; }); - _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); SCOPED_TIMER(_spill_build_timer); auto status = [&]() { @@ -411,7 +464,7 @@ Status PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state, if (!rows) { return Status::OK(); } - Defer defer {[&]() { memory_used_counter()->set((int64_t)revocable_mem_size(state)); }}; + Defer defer {[&]() { update_memory_usage(); }}; { /// TODO: DO NOT execute build exprs twice(when partition and building hash table) SCOPED_TIMER(_partition_timer); @@ -455,10 +508,9 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk( if (_spill_status_ok) { auto block = partitioned_block->to_block(); - auto block_mem_usage = block.allocated_bytes(); - Defer defer {[&]() { memory_used_counter()->update(-block_mem_usage); }}; + int64_t block_mem_usage = block.allocated_bytes(); + Defer defer {[&]() { COUNTER_UPDATE(memory_used_counter(), -block_mem_usage); }}; partitioned_block = vectorized::MutableBlock::create_unique(block.clone_empty()); - memory_used_counter()->update(partitioned_block->allocated_bytes()); auto st = spilling_stream->spill_block(state(), block, false); if (!st.ok()) { _spill_status_ok = false; @@ -609,10 +661,7 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B "fault_inject partitioned_hash_join_sink " "sink_eos failed"); }); - Defer defer {[&]() { - local_state.memory_used_counter()->set( - (int64_t)local_state.revocable_mem_size(state)); - }}; + Defer defer {[&]() { local_state.update_memory_usage(); }}; RETURN_IF_ERROR(_inner_sink_operator->sink( local_state._shared_state->inner_runtime_state.get(), in_block, eos)); VLOG_DEBUG << "hash join sink " << node_id() << " sink eos, set_ready_to_read" @@ -650,9 +699,7 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B "fault_inject partitioned_hash_join_sink " "sink failed"); }); - Defer defer {[&]() { - local_state.memory_used_counter()->set((int64_t)local_state.revocable_mem_size(state)); - }}; + Defer defer {[&]() { local_state.update_memory_usage(); }}; RETURN_IF_ERROR(_inner_sink_operator->sink( local_state._shared_state->inner_runtime_state.get(), in_block, eos)); diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index d3725997882..aaa6d64adf9 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -47,6 +47,7 @@ public: Status revoke_memory(RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context); size_t revocable_mem_size(RuntimeState* state) const; [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state, bool eos); + void update_memory_usage(); protected: PartitionedHashJoinSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 38e7a173197..5d8355b865d 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -50,6 +50,13 @@ Status SpillSortSinkLocalState::init(doris::RuntimeState* state, return Status::OK(); } +Status SpillSortSinkLocalState::open(RuntimeState* state) { + SCOPED_TIMER(Base::exec_time_counter()); + SCOPED_TIMER(Base::_open_timer); + _shared_state->setup_shared_profile(_profile); + return Base::open(state); +} + void SpillSortSinkLocalState::_init_counters() { _internal_runtime_profile = std::make_unique<RuntimeProfile>("internal_profile"); @@ -57,10 +64,7 @@ void SpillSortSinkLocalState::_init_counters() { _merge_block_timer = ADD_TIMER(_profile, "MergeBlockTime"); _sort_blocks_memory_usage = ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsageSortBlocks", TUnit::BYTES, 1); - _spill_merge_sort_timer = ADD_TIMER_WITH_LEVEL(_profile, "SpillMergeSortTime", 1); - - _spill_wait_in_queue_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillWaitInQueueTime", 1); } #define UPDATE_PROFILE(counter, name) \ do { \ @@ -201,13 +205,8 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state, _shared_state->spill_block_batch_row_count, SpillSortSharedState::SORT_BLOCK_SPILL_BATCH_BYTES, profile()); RETURN_IF_ERROR(status); + _spilling_stream->set_write_counters(_profile); - _spilling_stream->set_write_counters( - Base::_spill_serialize_block_timer, Base::_spill_block_count, Base::_spill_data_size, - Base::_spill_write_disk_timer, Base::_spill_write_wait_io_timer, memory_used_counter()); - - status = _spilling_stream->prepare_spill(); - RETURN_IF_ERROR(status); _shared_state->sorted_streams.emplace_back(_spilling_stream); auto& parent = Base::_parent->template cast<Parent>(); @@ -218,11 +217,7 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state, } auto query_id = state->query_id(); - MonotonicStopWatch submit_timer; - submit_timer.start(); - - auto spill_func = [this, state, query_id, &parent, submit_timer] { - _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); + auto spill_func = [this, state, query_id, &parent] { Defer defer {[&]() { if (!_shared_state->sink_status.ok() || state->is_cancelled()) { if (!_shared_state->sink_status.ok()) { @@ -267,10 +262,7 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state, &eos); } RETURN_IF_ERROR(_shared_state->sink_status); - { - SCOPED_TIMER(Base::_spill_timer); - _shared_state->sink_status = _spilling_stream->spill_block(state, block, eos); - } + _shared_state->sink_status = _spilling_stream->spill_block(state, block, eos); RETURN_IF_ERROR(_shared_state->sink_status); block.clear_column_data(); } @@ -279,7 +271,19 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state, return Status::OK(); }; - auto exception_catch_func = [this, query_id, spill_context, spill_func]() { + MonotonicStopWatch submit_timer; + submit_timer.start(); + + auto exception_catch_func = [this, query_id, spill_context, submit_timer, spill_func]() { + auto submit_elapsed_time = submit_timer.elapsed_time(); + _spill_write_wait_in_queue_timer->update(submit_elapsed_time); + exec_time_counter()->update(submit_elapsed_time); + _spill_total_timer->update(submit_elapsed_time); + + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_spill_total_timer); + SCOPED_TIMER(_spill_write_timer); + DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_cancel", { ExecEnv::GetInstance()->fragment_mgr()->cancel_query( query_id, Status::InternalError("fault_inject spill_sort_sink " @@ -304,7 +308,8 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state, if (status.ok()) { state->get_query_ctx()->increase_revoking_tasks_count(); status = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit( - std::make_shared<SpillRunnable>(state, _shared_state->shared_from_this(), + std::make_shared<SpillRunnable>(state, _profile, true, + _shared_state->shared_from_this(), exception_catch_func)); } if (!status.ok()) { diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h b/be/src/pipeline/exec/spill_sort_sink_operator.h index 7a361199239..086d93a970c 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.h +++ b/be/src/pipeline/exec/spill_sort_sink_operator.h @@ -36,6 +36,7 @@ public: ~SpillSortSinkLocalState() override = default; Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + Status open(RuntimeState* state) override; Status close(RuntimeState* state, Status exec_status) override; Dependency* finishdependency() override { return _finish_dependency.get(); } diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp b/be/src/pipeline/exec/spill_sort_source_operator.cpp index fe1356381b2..5cc124caaea 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp @@ -36,6 +36,7 @@ SpillSortLocalState::SpillSortLocalState(RuntimeState* state, OperatorXBase* par } Status SpillSortLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); + init_spill_write_counters(); SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_init_timer); @@ -45,14 +46,6 @@ Status SpillSortLocalState::init(RuntimeState* state, LocalStateInfo& info) { _internal_runtime_profile = std::make_unique<RuntimeProfile>("internal_profile"); _spill_merge_sort_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillMergeSortTime", 1); - _spill_serialize_block_timer = - ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillSerializeBlockTime", 1); - _spill_write_disk_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteDiskTime", 1); - _spill_data_size = - ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteDataSize", TUnit::BYTES, 1); - _spill_block_count = - ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockCount", TUnit::UNIT, 1); - _spill_wait_in_queue_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillWaitInQueueTime", 1); return Status::OK(); } @@ -86,11 +79,7 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat auto query_id = state->query_id(); - MonotonicStopWatch submit_timer; - submit_timer.start(); - - auto spill_func = [this, state, query_id, &parent, submit_timer] { - _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); + auto spill_func = [this, state, query_id, &parent] { SCOPED_TIMER(_spill_merge_sort_timer); Defer defer {[&]() { if (!_status.ok() || state->is_cancelled()) { @@ -136,15 +125,11 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat _shared_state->spill_block_batch_row_count, SpillSortSharedState::SORT_BLOCK_SPILL_BATCH_BYTES, profile()); RETURN_IF_ERROR(_status); - _status = tmp_stream->prepare_spill(); - RETURN_IF_ERROR(_status); + tmp_stream->set_write_counters(profile()); _shared_state->sorted_streams.emplace_back(tmp_stream); bool eos = false; - tmp_stream->set_write_counters(_spill_serialize_block_timer, _spill_block_count, - _spill_data_size, _spill_write_disk_timer, - _spill_write_wait_io_timer, memory_used_counter()); while (!eos && !state->is_cancelled()) { merge_sorted_block.clear_column_data(); { @@ -178,7 +163,19 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat return Status::OK(); }; - auto exception_catch_func = [this, spill_func]() { + MonotonicStopWatch submit_timer; + submit_timer.start(); + + auto exception_catch_func = [this, spill_func, submit_timer]() { + auto submit_elapsed_time = submit_timer.elapsed_time(); + _spill_read_wait_in_queue_timer->update(submit_elapsed_time); + exec_time_counter()->update(submit_elapsed_time); + _spill_total_timer->update(submit_elapsed_time); + + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_spill_total_timer); + SCOPED_TIMER(_spill_recover_time); + _status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); }); }(); }; @@ -188,7 +185,8 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat "merge_sort_spill_data submit_func failed"); }); return ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit( - std::make_shared<SpillRunnable>(state, _shared_state->shared_from_this(), + std::make_shared<SpillRunnable>(state, _runtime_profile.get(), false, + _shared_state->shared_from_this(), exception_catch_func)); } @@ -203,8 +201,7 @@ Status SpillSortLocalState::_create_intermediate_merger( _current_merging_streams.clear(); for (int i = 0; i < num_blocks && !_shared_state->sorted_streams.empty(); ++i) { auto stream = _shared_state->sorted_streams.front(); - stream->set_read_counters(Base::_spill_read_data_time, Base::_spill_deserialize_time, - Base::_spill_read_bytes, Base::_spill_read_wait_io_timer); + stream->set_read_counters(profile()); _current_merging_streams.emplace_back(stream); child_block_suppliers.emplace_back( std::bind(std::mem_fn(&vectorized::SpillStream::read_next_block_sync), stream.get(), @@ -263,6 +260,7 @@ Status SpillSortSourceOperatorX::close(RuntimeState* state) { Status SpillSortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { auto& local_state = get_local_state(state); + local_state.copy_shared_spill_profile(); Defer defer {[&]() { if (!local_state._status.ok() || *eos) { local_state._shared_state->close(); diff --git a/be/src/pipeline/exec/spill_sort_source_operator.h b/be/src/pipeline/exec/spill_sort_source_operator.h index ca984e352fc..7536dd15e92 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.h +++ b/be/src/pipeline/exec/spill_sort_source_operator.h @@ -65,10 +65,6 @@ protected: std::unique_ptr<RuntimeProfile> _internal_runtime_profile; // counters for spill merge sort RuntimeProfile::Counter* _spill_merge_sort_timer = nullptr; - RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr; - RuntimeProfile::Counter* _spill_write_disk_timer = nullptr; - RuntimeProfile::Counter* _spill_data_size = nullptr; - RuntimeProfile::Counter* _spill_block_count = nullptr; }; class SortSourceOperatorX; class SpillSortSourceOperatorX : public OperatorX<SpillSortLocalState> { diff --git a/be/src/pipeline/exec/spill_utils.h b/be/src/pipeline/exec/spill_utils.h index d2b157463ae..2ea5cedcdb0 100644 --- a/be/src/pipeline/exec/spill_utils.h +++ b/be/src/pipeline/exec/spill_utils.h @@ -29,6 +29,7 @@ #include "runtime/runtime_state.h" #include "runtime/task_execution_context.h" #include "runtime/thread_context.h" +#include "util/runtime_profile.h" #include "util/threadpool.h" #include "vec/runtime/partitioner.h" @@ -78,12 +79,23 @@ private: class SpillRunnable : public Runnable { public: - SpillRunnable(RuntimeState* state, const std::shared_ptr<BasicSharedState>& shared_state, - std::function<void()> func) + SpillRunnable(RuntimeState* state, RuntimeProfile* profile, bool is_write, + const std::shared_ptr<BasicSharedState>& shared_state, std::function<void()> func) : _state(state), + _is_write(is_write), _task_context_holder(state->get_task_execution_context()), _shared_state_holder(shared_state), - _func(std::move(func)) {} + _func(std::move(func)) { + write_wait_in_queue_task_count = profile->get_counter("SpillWriteTaskWaitInQueueCount"); + writing_task_count = profile->get_counter("SpillWriteTaskCount"); + read_wait_in_queue_task_count = profile->get_counter("SpillReadTaskWaitInQueueCount"); + reading_task_count = profile->get_counter("SpillReadTaskCount"); + if (is_write) { + COUNTER_UPDATE(write_wait_in_queue_task_count, 1); + } else { + COUNTER_UPDATE(read_wait_in_queue_task_count, 1); + } + } ~SpillRunnable() override = default; @@ -94,8 +106,20 @@ public: if (!task_context_holder) { return; } + if (_is_write) { + COUNTER_UPDATE(write_wait_in_queue_task_count, -1); + COUNTER_UPDATE(writing_task_count, 1); + } else { + COUNTER_UPDATE(read_wait_in_queue_task_count, -1); + COUNTER_UPDATE(reading_task_count, 1); + } SCOPED_ATTACH_TASK(_state); Defer defer([&] { + if (_is_write) { + COUNTER_UPDATE(writing_task_count, -1); + } else { + COUNTER_UPDATE(reading_task_count, -1); + } std::function<void()> tmp; std::swap(tmp, _func); }); @@ -113,6 +137,11 @@ public: private: RuntimeState* _state; + bool _is_write; + RuntimeProfile::Counter* write_wait_in_queue_task_count = nullptr; + RuntimeProfile::Counter* writing_task_count = nullptr; + RuntimeProfile::Counter* read_wait_in_queue_task_count = nullptr; + RuntimeProfile::Counter* reading_task_count = nullptr; std::weak_ptr<TaskExecutionContext> _task_context_holder; std::weak_ptr<BasicSharedState> _shared_state_holder; std::function<void()> _func; diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_operator.cpp index e127565b15f..d69c43dfc46 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp @@ -95,7 +95,6 @@ Status StreamingAggLocalState::init(RuntimeState* state, LocalStateInfo& info) { _build_timer = ADD_TIMER(Base::profile(), "BuildTime"); _build_table_convert_timer = ADD_TIMER(Base::profile(), "BuildConvertToPartitionedTime"); _serialize_key_timer = ADD_TIMER(Base::profile(), "SerializeKeyTime"); - _exec_timer = ADD_TIMER(Base::profile(), "ExecTime"); _merge_timer = ADD_TIMER(Base::profile(), "MergeTime"); _expr_timer = ADD_TIMER(Base::profile(), "ExprTime"); _serialize_data_timer = ADD_TIMER(Base::profile(), "SerializeDataTime"); diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index f1804601693..5ed3509ae78 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -548,9 +548,11 @@ public: [[nodiscard]] Status merge_impl_ignore_overflow(T&& block) { if (_columns.size() != block.columns()) { return Status::Error<ErrorCode::INTERNAL_ERROR>( - "Merge block not match, self:[columns: {}, types: {}], input:[columns: {}, " + "Merge block not match, self column count: {}, [columns: {}, types: {}], " + "input column count: {}, [columns: {}, " "types: {}], ", - dump_names(), dump_types(), block.dump_names(), block.dump_types()); + _columns.size(), dump_names(), dump_types(), block.columns(), + block.dump_names(), block.dump_types()); } for (int i = 0; i < _columns.size(); ++i) { DCHECK(_data_types[i]->equals(*block.get_by_position(i).type)) @@ -583,9 +585,11 @@ public: } else { if (_columns.size() != block.columns()) { return Status::Error<ErrorCode::INTERNAL_ERROR>( - "Merge block not match, self:[columns: {}, types: {}], input:[columns: {}, " + "Merge block not match, self column count: {}, [columns: {}, types: {}], " + "input column count: {}, [columns: {}, " "types: {}], ", - dump_names(), dump_types(), block.dump_names(), block.dump_types()); + _columns.size(), dump_names(), dump_types(), block.columns(), + block.dump_names(), block.dump_types()); } for (int i = 0; i < _columns.size(); ++i) { if (!_data_types[i]->equals(*block.get_by_position(i).type)) { diff --git a/be/src/vec/spill/spill_reader.cpp b/be/src/vec/spill/spill_reader.cpp index f0320ee9b49..d0258344541 100644 --- a/be/src/vec/spill/spill_reader.cpp +++ b/be/src/vec/spill/spill_reader.cpp @@ -37,7 +37,9 @@ Status SpillReader::open() { return Status::OK(); } - SCOPED_TIMER(read_timer_); + SCOPED_TIMER(_read_file_timer); + + COUNTER_UPDATE(_read_file_count, 1); RETURN_IF_ERROR(io::global_local_filesystem()->open_file(file_path_, &file_reader_)); @@ -50,12 +52,14 @@ Status SpillReader::open() { size_t bytes_read = 0; RETURN_IF_ERROR(file_reader_->read_at(file_size - sizeof(size_t), result, &bytes_read)); DCHECK(bytes_read == 8); // max_sub_block_size, block count + COUNTER_UPDATE(_read_file_size, bytes_read); // read max sub block size bytes_read = 0; result.data = (char*)&max_sub_block_size_; RETURN_IF_ERROR(file_reader_->read_at(file_size - sizeof(size_t) * 2, result, &bytes_read)); DCHECK(bytes_read == 8); // max_sub_block_size, block count + COUNTER_UPDATE(_read_file_size, bytes_read); size_t buff_size = std::max(block_count_ * sizeof(size_t), max_sub_block_size_); try { @@ -73,6 +77,7 @@ Status SpillReader::open() { RETURN_IF_ERROR(file_reader_->read_at(read_offset, result, &bytes_read)); DCHECK(bytes_read == block_count_ * sizeof(size_t)); + COUNTER_UPDATE(_read_file_size, bytes_read); block_start_offsets_.resize(block_count_ + 1); for (size_t i = 0; i < block_count_; ++i) { @@ -103,21 +108,24 @@ Status SpillReader::read(Block* block, bool* eos) { Slice result(read_buff_.get(), bytes_to_read); size_t bytes_read = 0; { - SCOPED_TIMER(read_timer_); + SCOPED_TIMER(_read_file_timer); RETURN_IF_ERROR(file_reader_->read_at(block_start_offsets_[read_block_index_], result, &bytes_read)); } DCHECK(bytes_read == bytes_to_read); - COUNTER_UPDATE(read_bytes_, bytes_read); if (bytes_read > 0) { + COUNTER_UPDATE(_read_file_size, bytes_read); + COUNTER_UPDATE(_read_block_count, 1); { - SCOPED_TIMER(deserialize_timer_); + SCOPED_TIMER(_deserialize_timer); if (!pb_block_.ParseFromArray(result.data, result.size)) { return Status::InternalError("Failed to read spilled block"); } RETURN_IF_ERROR(block->deserialize(pb_block_)); } + COUNTER_UPDATE(_read_block_data_size, block->bytes()); + COUNTER_UPDATE(_read_rows_count, block->rows()); } else { block->clear_column_data(); } diff --git a/be/src/vec/spill/spill_reader.h b/be/src/vec/spill/spill_reader.h index 6694bf91572..fcb7d8d9e0b 100644 --- a/be/src/vec/spill/spill_reader.h +++ b/be/src/vec/spill/spill_reader.h @@ -50,12 +50,14 @@ public: size_t block_count() const { return block_count_; } - void set_counters(RuntimeProfile::Counter* read_timer, - RuntimeProfile::Counter* deserialize_timer, - RuntimeProfile::Counter* read_bytes) { - read_timer_ = read_timer; - deserialize_timer_ = deserialize_timer; - read_bytes_ = read_bytes; + void set_counters(RuntimeProfile* profile) { + _read_file_timer = profile->get_counter("SpillReadFileTime"); + _deserialize_timer = profile->get_counter("SpillReadDerializeBlockTime"); + _read_block_count = profile->get_counter("SpillReadBlockCount"); + _read_block_data_size = profile->get_counter("SpillReadBlockDataSize"); + _read_file_size = profile->get_counter("SpillReadFileSize"); + _read_rows_count = profile->get_counter("SpillReadRows"); + _read_file_count = profile->get_counter("SpillReadFileCount"); } private: @@ -71,9 +73,13 @@ private: PBlock pb_block_; - RuntimeProfile::Counter* read_timer_; - RuntimeProfile::Counter* deserialize_timer_; - RuntimeProfile::Counter* read_bytes_; + RuntimeProfile::Counter* _read_file_timer = nullptr; + RuntimeProfile::Counter* _deserialize_timer = nullptr; + RuntimeProfile::Counter* _read_block_count = nullptr; + RuntimeProfile::Counter* _read_block_data_size = nullptr; + RuntimeProfile::Counter* _read_file_size = nullptr; + RuntimeProfile::Counter* _read_rows_count = nullptr; + RuntimeProfile::Counter* _read_file_count = nullptr; }; using SpillReaderUPtr = std::unique_ptr<SpillReader>; diff --git a/be/src/vec/spill/spill_stream.cpp b/be/src/vec/spill/spill_stream.cpp index 7189fad262c..6f9143b8073 100644 --- a/be/src/vec/spill/spill_stream.cpp +++ b/be/src/vec/spill/spill_stream.cpp @@ -44,16 +44,31 @@ SpillStream::SpillStream(RuntimeState* state, int64_t stream_id, SpillDataDir* d batch_rows_(batch_rows), batch_bytes_(batch_bytes), query_id_(state->query_id()), - profile_(profile) {} + profile_(profile) { + _total_file_count = profile_->get_counter("SpillWriteFileTotalCount"); + _current_file_count = profile_->get_counter("SpillWriteFileCurrentCount"); + _current_file_size = profile_->get_counter("SpillWriteFileCurrentSize"); +} + +void SpillStream::update_shared_profiles(RuntimeProfile* source_op_profile) { + _current_file_count = source_op_profile->get_counter("SpillWriteFileCurrentCount"); + _current_file_size = source_op_profile->get_counter("SpillWriteFileCurrentSize"); +} SpillStream::~SpillStream() { gc(); } void SpillStream::gc() { + if (_current_file_size) { + COUNTER_UPDATE(_current_file_size, -total_written_bytes_); + } bool exists = false; auto status = io::global_local_filesystem()->exists(spill_dir_, &exists); if (status.ok() && exists) { + if (_current_file_count) { + COUNTER_UPDATE(_current_file_count, -1); + } auto query_gc_dir = data_dir_->get_spill_data_gc_path(print_id(query_id_)); status = io::global_local_filesystem()->create_directory(query_gc_dir); DBUG_EXECUTE_IF("fault_inject::spill_stream::gc", { @@ -79,10 +94,19 @@ void SpillStream::gc() { } Status SpillStream::prepare() { - writer_ = std::make_unique<SpillWriter>(stream_id_, batch_rows_, data_dir_, spill_dir_); + writer_ = + std::make_unique<SpillWriter>(profile_, stream_id_, batch_rows_, data_dir_, spill_dir_); reader_ = std::make_unique<SpillReader>(stream_id_, writer_->get_file_path()); - return Status::OK(); + + DBUG_EXECUTE_IF("fault_inject::spill_stream::prepare_spill", { + return Status::Error<INTERNAL_ERROR>("fault_inject spill_stream prepare_spill failed"); + }); + COUNTER_UPDATE(_total_file_count, 1); + if (_current_file_count) { + COUNTER_UPDATE(_current_file_count, 1); + } + return writer_->open(); } const TUniqueId& SpillStream::query_id() const { @@ -92,12 +116,6 @@ const TUniqueId& SpillStream::query_id() const { const std::string& SpillStream::get_spill_root_dir() const { return data_dir_->path(); } -Status SpillStream::prepare_spill() { - DBUG_EXECUTE_IF("fault_inject::spill_stream::prepare_spill", { - return Status::Error<INTERNAL_ERROR>("fault_inject spill_stream prepare_spill failed"); - }); - return writer_->open(); -} Status SpillStream::spill_block(RuntimeState* state, const Block& block, bool eof) { size_t written_bytes = 0; @@ -106,9 +124,7 @@ Status SpillStream::spill_block(RuntimeState* state, const Block& block, bool eo }); RETURN_IF_ERROR(writer_->write(state, block, written_bytes)); if (eof) { - RETURN_IF_ERROR(writer_->close()); - total_written_bytes_ = writer_->get_written_bytes(); - writer_.reset(); + RETURN_IF_ERROR(spill_eof()); } else { total_written_bytes_ = writer_->get_written_bytes(); } diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h index 26b7dcbaf06..5be151be72c 100644 --- a/be/src/vec/spill/spill_stream.h +++ b/be/src/vec/spill/spill_stream.h @@ -56,32 +56,17 @@ public: int64_t get_written_bytes() const { return total_written_bytes_; } - Status prepare_spill(); - Status spill_block(RuntimeState* state, const Block& block, bool eof); Status spill_eof(); Status read_next_block_sync(Block* block, bool* eos); - void set_write_counters(RuntimeProfile::Counter* serialize_timer, - RuntimeProfile::Counter* write_block_counter, - RuntimeProfile::Counter* write_bytes_counter, - RuntimeProfile::Counter* write_timer, - RuntimeProfile::Counter* wait_io_timer, - RuntimeProfile::Counter* memory_used_counter) { - writer_->set_counters(serialize_timer, write_block_counter, write_bytes_counter, - write_timer, memory_used_counter); - write_wait_io_timer_ = wait_io_timer; - } - - void set_read_counters(RuntimeProfile::Counter* read_timer, - RuntimeProfile::Counter* deserialize_timer, - RuntimeProfile::Counter* read_bytes, - RuntimeProfile::Counter* wait_io_timer) { - reader_->set_counters(read_timer, deserialize_timer, read_bytes); - read_wait_io_timer_ = wait_io_timer; - } + void set_write_counters(RuntimeProfile* profile) { writer_->set_counters(profile); } + + void set_read_counters(RuntimeProfile* profile) { reader_->set_counters(profile); } + + void update_shared_profiles(RuntimeProfile* source_op_profile); const TUniqueId& query_id() const; @@ -93,6 +78,8 @@ private: RuntimeState* state_ = nullptr; int64_t stream_id_; SpillDataDir* data_dir_ = nullptr; + // Directory path format specified in SpillStreamManager::register_spill_stream: + // storage_root/spill/query_id/partitioned_hash_join-node_id-task_id-stream_id std::string spill_dir_; size_t batch_rows_; size_t batch_bytes_; @@ -106,8 +93,9 @@ private: TUniqueId query_id_; RuntimeProfile* profile_ = nullptr; - RuntimeProfile::Counter* write_wait_io_timer_ = nullptr; - RuntimeProfile::Counter* read_wait_io_timer_ = nullptr; + RuntimeProfile::Counter* _current_file_count = nullptr; + RuntimeProfile::Counter* _total_file_count = nullptr; + RuntimeProfile::Counter* _current_file_size = nullptr; }; using SpillStreamSPtr = std::shared_ptr<SpillStream>; } // namespace vectorized diff --git a/be/src/vec/spill/spill_stream_manager.cpp b/be/src/vec/spill/spill_stream_manager.cpp index 61e96559d23..c4641866dcf 100644 --- a/be/src/vec/spill/spill_stream_manager.cpp +++ b/be/src/vec/spill/spill_stream_manager.cpp @@ -168,6 +168,7 @@ Status SpillStreamManager::register_spill_stream(RuntimeState* state, SpillStrea SpillDataDir* data_dir = nullptr; for (auto& dir : data_dirs) { std::string spill_root_dir = dir->get_spill_data_path(); + // storage_root/spill/query_id/partitioned_hash_join-node_id-task_id-stream_id spill_dir = fmt::format("{}/{}/{}-{}-{}-{}", spill_root_dir, query_id, operator_name, node_id, state->task_id(), id); auto st = io::global_local_filesystem()->create_directory(spill_dir); diff --git a/be/src/vec/spill/spill_writer.cpp b/be/src/vec/spill/spill_writer.cpp index 9fbd81601b6..34715787756 100644 --- a/be/src/vec/spill/spill_writer.cpp +++ b/be/src/vec/spill/spill_writer.cpp @@ -45,12 +45,12 @@ Status SpillWriter::close() { // meta: block1 offset, block2 offset, ..., blockn offset, max_sub_block_size, n { - SCOPED_TIMER(write_timer_); + SCOPED_TIMER(_write_file_timer); RETURN_IF_ERROR(file_writer_->append(meta_)); } total_written_bytes_ += meta_.size(); - COUNTER_UPDATE(write_bytes_counter_, meta_.size()); + COUNTER_UPDATE(_write_file_data_bytes_counter, meta_.size()); data_dir_->update_spill_data_usage(meta_.size()); @@ -64,6 +64,8 @@ Status SpillWriter::write(RuntimeState* state, const Block& block, size_t& writt written_bytes = 0; DCHECK(file_writer_); auto rows = block.rows(); + COUNTER_UPDATE(_write_rows_counter, rows); + COUNTER_UPDATE(_write_block_bytes_counter, block.bytes()); // file format: block1, block2, ..., blockn, meta if (rows <= batch_size_) { return _write_internal(block, written_bytes); @@ -84,9 +86,10 @@ Status SpillWriter::write(RuntimeState* state, const Block& block, size_t& writt } }); - auto tmp_blcok_mem = tmp_block.allocated_bytes(); - memory_used_counter_->update(tmp_blcok_mem); - Defer defer {[&]() { memory_used_counter_->update(-tmp_blcok_mem); }}; + int64_t tmp_blcok_mem = tmp_block.allocated_bytes(); + COUNTER_UPDATE(_memory_used_counter, tmp_blcok_mem); + COUNTER_SET(_peak_memory_usage_counter, _memory_used_counter->value()); + Defer defer {[&]() { COUNTER_UPDATE(_memory_used_counter, -tmp_blcok_mem); }}; RETURN_IF_ERROR(_write_internal(tmp_block, written_bytes)); row_idx += block_rows; @@ -100,26 +103,31 @@ Status SpillWriter::_write_internal(const Block& block, size_t& written_bytes) { Status status; std::string buff; + int64_t buff_size {0}; if (block.rows() > 0) { { PBlock pblock; - SCOPED_TIMER(serialize_timer_); + SCOPED_TIMER(_serialize_timer); status = block.serialize( BeExecVersionManager::get_newest_version(), &pblock, &uncompressed_bytes, &compressed_bytes, segment_v2::CompressionTypePB::ZSTD); // ZSTD for better compression ratio RETURN_IF_ERROR(status); - auto pblock_mem = pblock.ByteSizeLong(); - memory_used_counter_->update(pblock_mem); - Defer defer {[&]() { memory_used_counter_->update(-pblock_mem); }}; + int64_t pblock_mem = pblock.ByteSizeLong(); + COUNTER_UPDATE(_memory_used_counter, pblock_mem); + COUNTER_SET(_peak_memory_usage_counter, _memory_used_counter->value()); + Defer defer {[&]() { COUNTER_UPDATE(_memory_used_counter, -pblock_mem); }}; if (!pblock.SerializeToString(&buff)) { return Status::Error<ErrorCode::SERIALIZE_PROTOBUF_ERROR>( "serialize spill data error. [path={}]", file_path_); } - memory_used_counter_->update(buff.size()); + buff_size = buff.size(); + COUNTER_UPDATE(_memory_used_counter, buff_size); + COUNTER_SET(_peak_memory_usage_counter, _memory_used_counter->value()); + Defer defer2 {[&]() { COUNTER_UPDATE(_memory_used_counter, -buff_size); }}; } - if (data_dir_->reach_capacity_limit(buff.size())) { + if (data_dir_->reach_capacity_limit(buff_size)) { return Status::Error<ErrorCode::DISK_REACH_CAPACITY_LIMIT>( "spill data total size exceed limit, path: {}, size limit: {}, spill data " "size: {}", @@ -129,31 +137,29 @@ Status SpillWriter::_write_internal(const Block& block, size_t& written_bytes) { } { - auto buff_size = buff.size(); Defer defer {[&]() { - memory_used_counter_->update(-buff_size); if (status.ok()) { data_dir_->update_spill_data_usage(buff_size); written_bytes += buff_size; - max_sub_block_size_ = std::max(max_sub_block_size_, buff_size); + max_sub_block_size_ = std::max(max_sub_block_size_, (size_t)buff_size); meta_.append((const char*)&total_written_bytes_, sizeof(size_t)); - COUNTER_UPDATE(write_bytes_counter_, buff_size); - COUNTER_UPDATE(write_block_counter_, 1); + COUNTER_UPDATE(_write_file_data_bytes_counter, buff_size); + COUNTER_UPDATE(_write_block_counter, 1); total_written_bytes_ += buff_size; ++written_blocks_; } }}; { - SCOPED_TIMER(write_timer_); + SCOPED_TIMER(_write_file_timer); status = file_writer_->append(buff); RETURN_IF_ERROR(status); } } } - return Status::OK(); + return status; } } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/spill/spill_writer.h b/be/src/vec/spill/spill_writer.h index c3502b5d6a4..a6ea3200b14 100644 --- a/be/src/vec/spill/spill_writer.h +++ b/be/src/vec/spill/spill_writer.h @@ -31,9 +31,16 @@ namespace vectorized { class SpillDataDir; class SpillWriter { public: - SpillWriter(int64_t id, size_t batch_size, SpillDataDir* data_dir, const std::string& dir) + SpillWriter(RuntimeProfile* profile, int64_t id, size_t batch_size, SpillDataDir* data_dir, + const std::string& dir) : data_dir_(data_dir), stream_id_(id), batch_size_(batch_size) { - file_path_ = dir + "/" + std::to_string(file_index_); + // Directory path format specified in SpillStreamManager::register_spill_stream: + // storage_root/spill/query_id/partitioned_hash_join-node_id-task_id-stream_id/0 + file_path_ = dir + "/0"; + + _memory_used_counter = profile->get_counter("MemoryUsage"); + _peak_memory_usage_counter = static_cast<RuntimeProfile::HighWaterMarkCounter*>( + profile->get_counter("MemoryUsagePeak")); } Status open(); @@ -48,21 +55,16 @@ public: const std::string& get_file_path() const { return file_path_; } - void set_counters(RuntimeProfile::Counter* serialize_timer, - RuntimeProfile::Counter* write_block_counter, - RuntimeProfile::Counter* write_bytes_counter, - RuntimeProfile::Counter* write_timer, - RuntimeProfile::Counter* memory_used_counter) { - serialize_timer_ = serialize_timer; - write_block_counter_ = write_block_counter; - write_bytes_counter_ = write_bytes_counter; - write_timer_ = write_timer; - memory_used_counter_ = memory_used_counter; + void set_counters(RuntimeProfile* profile) { + _write_file_timer = profile->get_counter("SpillWriteFileTime"); + _serialize_timer = profile->get_counter("SpillWriteSerializeBlockTime"); + _write_block_counter = profile->get_counter("SpillWriteBlockCount"); + _write_block_bytes_counter = profile->get_counter("SpillWriteBlockDataSize"); + _write_file_data_bytes_counter = profile->get_counter("SpillWriteFileTotalSize"); + _write_rows_counter = profile->get_counter("SpillWriteRows"); } private: - void _init_profile(); - Status _write_internal(const Block& block, size_t& written_bytes); // not owned, point to the data dir of this rowset @@ -72,7 +74,6 @@ private: int64_t stream_id_; size_t batch_size_; size_t max_sub_block_size_ = 0; - int file_index_ = 0; std::string file_path_; std::unique_ptr<doris::io::FileWriter> file_writer_; @@ -80,11 +81,14 @@ private: int64_t total_written_bytes_ = 0; std::string meta_; - RuntimeProfile::Counter* write_bytes_counter_ = nullptr; - RuntimeProfile::Counter* serialize_timer_ = nullptr; - RuntimeProfile::Counter* write_timer_ = nullptr; - RuntimeProfile::Counter* write_block_counter_ = nullptr; - RuntimeProfile::Counter* memory_used_counter_ = nullptr; + RuntimeProfile::Counter* _write_file_timer = nullptr; + RuntimeProfile::Counter* _serialize_timer = nullptr; + RuntimeProfile::Counter* _write_block_counter = nullptr; + RuntimeProfile::Counter* _write_block_bytes_counter = nullptr; + RuntimeProfile::Counter* _write_file_data_bytes_counter = nullptr; + RuntimeProfile::Counter* _write_rows_counter = nullptr; + RuntimeProfile::Counter* _memory_used_counter = nullptr; + RuntimeProfile::HighWaterMarkCounter* _peak_memory_usage_counter = nullptr; }; using SpillWriterUPtr = std::unique_ptr<SpillWriter>; } // namespace vectorized --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org