This is an automated email from the ASF dual-hosted git repository.
jacktengg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new 9df53b29b34 [improvement](spill) add counter in fe audit log
9df53b29b34 is described below
commit 9df53b29b349bdb1a8c766c47f98b58a4091ce50
Author: jacktengg <[email protected]>
AuthorDate: Tue Oct 29 11:34:25 2024 +0800
[improvement](spill) add counter in fe audit log
---
be/src/pipeline/dependency.h | 4 +-
be/src/pipeline/exec/hashjoin_build_sink.cpp | 2 +-
be/src/pipeline/exec/operator.h | 47 +++++++++++++++++-----
be/src/pipeline/exec/spill_sort_sink_operator.cpp | 2 +-
be/src/runtime/query_statistics.cpp | 17 ++++++++
be/src/runtime/query_statistics.h | 19 ++++++++-
.../workload_group/workload_group_manager.cpp | 6 +--
be/src/vec/spill/spill_reader.h | 4 +-
be/src/vec/spill/spill_stream.cpp | 4 +-
be/src/vec/spill/spill_writer.h | 6 +--
.../java/org/apache/doris/plugin/AuditEvent.java | 28 +++++++++++++
.../java/org/apache/doris/qe/AuditLogHelper.java | 4 ++
.../WorkloadRuntimeStatusMgr.java | 8 ++++
gensrc/proto/data.proto | 4 ++
gensrc/thrift/FrontendService.thrift | 4 ++
15 files changed, 133 insertions(+), 26 deletions(-)
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 1a89ca4f02c..211ede2c455 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -432,7 +432,7 @@ struct BasicSpillSharedState {
AtomicStatus _spill_status;
// These two counters are shared to spill source operators as the initial
value
- // of 'SpillWriteFileCurrentSize' and 'SpillWriteFileCurrentCount'.
+ // of 'SpillWriteFileCurrentBytes' and 'SpillWriteFileCurrentCount'.
// Total bytes of spill data written to disk file(after serialized)
RuntimeProfile::Counter* _spill_write_file_total_size = nullptr;
RuntimeProfile::Counter* _spill_file_total_count = nullptr;
@@ -441,7 +441,7 @@ struct BasicSpillSharedState {
_spill_file_total_count =
ADD_COUNTER_WITH_LEVEL(sink_profile,
"SpillWriteFileTotalCount", TUnit::UNIT, 1);
_spill_write_file_total_size =
- ADD_COUNTER_WITH_LEVEL(sink_profile,
"SpillWriteFileTotalSize", TUnit::BYTES, 1);
+ ADD_COUNTER_WITH_LEVEL(sink_profile, "SpillWriteFileBytes",
TUnit::BYTES, 1);
}
virtual void update_spill_stream_profiles(RuntimeProfile* source_profile)
= 0;
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 79c068a83c1..a334f57859b 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -131,7 +131,7 @@ size_t
HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state, bo
const auto estimated_size_of_next_block = bytes_per_row *
state->batch_size();
// If the new size is greater than 95% of allocalted bytes, it maybe
need to realloc.
- if (((estimated_size_of_next_block + bytes) * 100 / allocated_bytes)
>= 95) {
+ if (((estimated_size_of_next_block + bytes) * 100 / allocated_bytes)
>= 85) {
size_to_reserve += bytes + estimated_size_of_next_block;
}
}
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index b93dc925a1b..7c0bd44a664 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -294,6 +294,20 @@ public:
return Status::OK();
}
+ Status close(RuntimeState* state) override {
+ if (Base::_query_statistics) {
+ auto* write_block_bytes =
Base::profile()->get_counter("SpillWriteBlockBytes");
+ auto* write_file_bytes =
Base::profile()->get_counter("SpillWriteFileBytes");
+ auto* read_block_bytes =
Base::profile()->get_counter("SpillReadBlockBytes");
+ auto* read_file_bytes =
Base::profile()->get_counter("SpillReadFileBytes");
+ Base::_query_statistics->add_spill_bytes(
+ write_block_bytes ? write_block_bytes->value() : 0,
+ write_file_bytes ? write_file_bytes->value() : 0,
read_block_bytes->value(),
+ read_file_bytes->value());
+ }
+ return Base::close(state);
+ }
+
void init_spill_write_counters() {
_spill_write_timer = ADD_TIMER_WITH_LEVEL(Base::profile(),
"SpillWriteTime", 1);
@@ -311,9 +325,9 @@ public:
_spill_write_block_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(),
"SpillWriteBlockCount", TUnit::UNIT, 1);
_spill_write_block_data_size =
- ADD_COUNTER_WITH_LEVEL(Base::profile(),
"SpillWriteBlockDataSize", TUnit::BYTES, 1);
+ ADD_COUNTER_WITH_LEVEL(Base::profile(),
"SpillWriteBlockBytes", TUnit::BYTES, 1);
_spill_write_file_total_size =
- ADD_COUNTER_WITH_LEVEL(Base::profile(),
"SpillWriteFileTotalSize", TUnit::BYTES, 1);
+ ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteFileBytes",
TUnit::BYTES, 1);
_spill_write_rows_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteRows",
TUnit::UNIT, 1);
_spill_file_total_count =
@@ -338,23 +352,23 @@ public:
_spill_read_block_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadBlockCount",
TUnit::UNIT, 1);
_spill_read_block_data_size =
- ADD_COUNTER_WITH_LEVEL(Base::profile(),
"SpillReadBlockDataSize", TUnit::BYTES, 1);
+ ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadBlockBytes",
TUnit::BYTES, 1);
_spill_read_file_size =
- ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadFileSize",
TUnit::BYTES, 1);
+ ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadFileBytes",
TUnit::BYTES, 1);
_spill_read_rows_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadRows",
TUnit::UNIT, 1);
_spill_read_file_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadFileCount",
TUnit::UNIT, 1);
_spill_file_current_size = ADD_COUNTER_WITH_LEVEL(
- Base::profile(), "SpillWriteFileCurrentSize", TUnit::BYTES, 1);
+ Base::profile(), "SpillWriteFileCurrentBytes", TUnit::BYTES,
1);
_spill_file_current_count = ADD_COUNTER_WITH_LEVEL(
Base::profile(), "SpillWriteFileCurrentCount", TUnit::UNIT, 1);
}
// These two counters are shared to spill source operators as the initial
value
- // Initialize values of counters 'SpillWriteFileCurrentSize' and
'SpillWriteFileCurrentCount'
- // from spill sink operators' "SpillWriteFileTotalCount" and
"SpillWriteFileTotalSize"
+ // Initialize values of counters 'SpillWriteFileCurrentBytes' and
'SpillWriteFileCurrentCount'
+ // from spill sink operators' "SpillWriteFileTotalCount" and
"SpillWriteFileBytes"
void copy_shared_spill_profile() {
if (_copy_shared_spill_profile) {
_copy_shared_spill_profile = false;
@@ -662,9 +676,6 @@ protected:
int _nereids_id = -1;
std::vector<int> _dests_id;
std::string _name;
-
- // Maybe this will be transferred to BufferControlBlock.
- std::shared_ptr<QueryStatistics> _query_statistics;
};
template <typename LocalStateType>
@@ -718,7 +729,7 @@ public:
_spill_write_block_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(),
"SpillWriteBlockCount", TUnit::UNIT, 1);
_spill_write_block_data_size =
- ADD_COUNTER_WITH_LEVEL(Base::profile(),
"SpillWriteBlockDataSize", TUnit::BYTES, 1);
+ ADD_COUNTER_WITH_LEVEL(Base::profile(),
"SpillWriteBlockBytes", TUnit::BYTES, 1);
_spill_write_rows_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteRows",
TUnit::UNIT, 1);
@@ -729,6 +740,20 @@ public:
return Status::OK();
}
+ Status close(RuntimeState* state, Status exec_status) override {
+ if (Base::_query_statistics) {
+ auto* write_block_bytes =
Base::profile()->get_counter("SpillWriteBlockBytes");
+ auto* write_file_bytes =
Base::profile()->get_counter("SpillWriteFileBytes");
+ auto* read_block_bytes =
Base::profile()->get_counter("SpillReadBlockBytes");
+ auto* read_file_bytes =
Base::profile()->get_counter("SpillReadFileBytes");
+ Base::_query_statistics->add_spill_bytes(
+ write_block_bytes->value(), write_file_bytes->value(),
+ read_block_bytes ? read_block_bytes->value() : 0,
+ read_file_bytes ? read_file_bytes->value() : 0);
+ }
+ return Base::close(state, exec_status);
+ }
+
std::vector<Dependency*> dependencies() const override {
auto dependencies = Base::dependencies();
return dependencies;
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 48a1b2b6ec3..03e9f33553e 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -82,7 +82,7 @@ void SpillSortSinkLocalState::update_profile(RuntimeProfile*
child_profile) {
Status SpillSortSinkLocalState::close(RuntimeState* state, Status
execsink_status) {
dec_running_big_mem_op_num(state);
- return Status::OK();
+ return Base::close(state, execsink_status);
}
Status SpillSortSinkLocalState::setup_in_memory_sort_op(RuntimeState* state) {
diff --git a/be/src/runtime/query_statistics.cpp
b/be/src/runtime/query_statistics.cpp
index 110efef5ab9..0ff3800aa16 100644
--- a/be/src/runtime/query_statistics.cpp
+++ b/be/src/runtime/query_statistics.cpp
@@ -44,6 +44,11 @@ void QueryStatistics::merge(const QueryStatistics& other) {
if (other_memory_used > 0) {
this->current_used_memory_bytes = other_memory_used;
}
+
+ _spill_write_block_bytes += other._spill_write_block_bytes;
+ _spill_write_file_bytes += other._spill_write_file_bytes;
+ _spill_read_block_bytes += other._spill_read_block_bytes;
+ _spill_read_file_bytes += other._spill_read_file_bytes;
}
void QueryStatistics::to_pb(PQueryStatistics* statistics) {
@@ -55,6 +60,10 @@ void QueryStatistics::to_pb(PQueryStatistics* statistics) {
statistics->set_max_peak_memory_bytes(max_peak_memory_bytes);
statistics->set_scan_bytes_from_remote_storage(_scan_bytes_from_remote_storage);
statistics->set_scan_bytes_from_local_storage(_scan_bytes_from_local_storage);
+ statistics->set_spill_write_block_bytes(_spill_write_block_bytes);
+ statistics->set_spill_write_file_bytes(_spill_write_file_bytes);
+ statistics->set_spill_read_block_bytes(_spill_read_block_bytes);
+ statistics->set_spill_read_file_bytes(_spill_read_file_bytes);
}
void QueryStatistics::to_thrift(TQueryStatistics* statistics) const {
@@ -69,6 +78,10 @@ void QueryStatistics::to_thrift(TQueryStatistics*
statistics) const {
statistics->__set_shuffle_send_rows(shuffle_send_rows);
statistics->__set_scan_bytes_from_remote_storage(_scan_bytes_from_remote_storage);
statistics->__set_scan_bytes_from_local_storage(_scan_bytes_from_local_storage);
+ statistics->__set_spill_write_block_bytes(_spill_write_block_bytes);
+ statistics->__set_spill_write_file_bytes(_spill_write_file_bytes);
+ statistics->__set_spill_read_block_bytes(_spill_read_block_bytes);
+ statistics->__set_spill_read_file_bytes(_spill_read_file_bytes);
}
void QueryStatistics::from_pb(const PQueryStatistics& statistics) {
@@ -77,6 +90,10 @@ void QueryStatistics::from_pb(const PQueryStatistics&
statistics) {
cpu_nanos = statistics.cpu_ms() * NANOS_PER_MILLIS;
_scan_bytes_from_local_storage =
statistics.scan_bytes_from_local_storage();
_scan_bytes_from_remote_storage =
statistics.scan_bytes_from_remote_storage();
+ _spill_write_block_bytes = statistics.spill_write_block_bytes();
+ _spill_write_file_bytes = statistics.spill_write_file_bytes();
+ _spill_read_block_bytes = statistics.spill_read_block_bytes();
+ _spill_read_file_bytes = statistics.spill_read_file_bytes();
}
QueryStatistics::~QueryStatistics() {}
diff --git a/be/src/runtime/query_statistics.h
b/be/src/runtime/query_statistics.h
index 0a19dfd46f0..affb006fa66 100644
--- a/be/src/runtime/query_statistics.h
+++ b/be/src/runtime/query_statistics.h
@@ -47,7 +47,11 @@ public:
max_peak_memory_bytes(0),
current_used_memory_bytes(0),
shuffle_send_bytes(0),
- shuffle_send_rows(0) {}
+ shuffle_send_rows(0),
+ _spill_write_block_bytes(0),
+ _spill_write_file_bytes(0),
+ _spill_read_block_bytes(0),
+ _spill_read_file_bytes(0) {}
virtual ~QueryStatistics();
void merge(const QueryStatistics& other);
@@ -80,6 +84,14 @@ public:
current_used_memory_bytes = current_used_memory;
}
+ void add_spill_bytes(int64_t spill_write_block_bytes, int64_t
spill_write_file_bytes,
+ int64_t spill_read_block_bytes, int64_t
spill_read_file_bytes) {
+ _spill_write_block_bytes += spill_write_block_bytes;
+ _spill_write_file_bytes += spill_write_file_bytes;
+ _spill_read_block_bytes += spill_read_block_bytes;
+ _spill_read_file_bytes += spill_read_file_bytes;
+ }
+
void to_pb(PQueryStatistics* statistics);
void to_thrift(TQueryStatistics* statistics) const;
void from_pb(const PQueryStatistics& statistics);
@@ -106,6 +118,11 @@ private:
std::atomic<int64_t> shuffle_send_bytes;
std::atomic<int64_t> shuffle_send_rows;
+
+ std::atomic<int64_t> _spill_write_block_bytes;
+ std::atomic<int64_t> _spill_write_file_bytes;
+ std::atomic<int64_t> _spill_read_block_bytes;
+ std::atomic<int64_t> _spill_read_file_bytes;
};
using QueryStatisticsPtr = std::shared_ptr<QueryStatistics>;
// It is used for collecting sub plan query statistics in DataStreamRecvr.
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 91a1438d2fb..3d820293694 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -617,10 +617,10 @@ bool
WorkloadGroupMgr::handle_single_query_(std::shared_ptr<QueryContext> query_
} else {
// Use MEM_LIMIT_EXCEEDED so that FE could parse the error
code and do try logic
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(
- "query({}) reserve memory failed, but could not find
memory that "
+ "query({}) reserve memory failed, but could not find
memory that "
"could "
- "release or spill to disk(usage:{}, limit: {})",
- query_id, memory_usage, query_ctx->get_mem_limit()));
+ "release or spill to disk(memory usage:{}, limit: {})",
+ query_id, PrettyPrinter::print_bytes(memory_usage),
PrettyPrinter::print_bytes(query_ctx->get_mem_limit())));
}
} else {
if (!GlobalMemoryArbitrator::is_exceed_hard_mem_limit()) {
diff --git a/be/src/vec/spill/spill_reader.h b/be/src/vec/spill/spill_reader.h
index fcb7d8d9e0b..edf2d6d8834 100644
--- a/be/src/vec/spill/spill_reader.h
+++ b/be/src/vec/spill/spill_reader.h
@@ -54,8 +54,8 @@ public:
_read_file_timer = profile->get_counter("SpillReadFileTime");
_deserialize_timer =
profile->get_counter("SpillReadDerializeBlockTime");
_read_block_count = profile->get_counter("SpillReadBlockCount");
- _read_block_data_size = profile->get_counter("SpillReadBlockDataSize");
- _read_file_size = profile->get_counter("SpillReadFileSize");
+ _read_block_data_size = profile->get_counter("SpillReadBlockBytes");
+ _read_file_size = profile->get_counter("SpillReadFileBytes");
_read_rows_count = profile->get_counter("SpillReadRows");
_read_file_count = profile->get_counter("SpillReadFileCount");
}
diff --git a/be/src/vec/spill/spill_stream.cpp
b/be/src/vec/spill/spill_stream.cpp
index 14f200a92e5..73624bfcd4a 100644
--- a/be/src/vec/spill/spill_stream.cpp
+++ b/be/src/vec/spill/spill_stream.cpp
@@ -47,12 +47,12 @@ SpillStream::SpillStream(RuntimeState* state, int64_t
stream_id, SpillDataDir* d
profile_(profile) {
_total_file_count = profile_->get_counter("SpillWriteFileTotalCount");
_current_file_count = profile_->get_counter("SpillWriteFileCurrentCount");
- _current_file_size = profile_->get_counter("SpillWriteFileCurrentSize");
+ _current_file_size = profile_->get_counter("SpillWriteFileCurrentBytes");
}
void SpillStream::update_shared_profiles(RuntimeProfile* source_op_profile) {
_current_file_count =
source_op_profile->get_counter("SpillWriteFileCurrentCount");
- _current_file_size =
source_op_profile->get_counter("SpillWriteFileCurrentSize");
+ _current_file_size =
source_op_profile->get_counter("SpillWriteFileCurrentBytes");
}
SpillStream::~SpillStream() {
diff --git a/be/src/vec/spill/spill_writer.h b/be/src/vec/spill/spill_writer.h
index 372ce21bb17..467ad57acd9 100644
--- a/be/src/vec/spill/spill_writer.h
+++ b/be/src/vec/spill/spill_writer.h
@@ -59,9 +59,9 @@ public:
_write_file_timer = profile->get_counter("SpillWriteFileTime");
_serialize_timer =
profile->get_counter("SpillWriteSerializeBlockTime");
_write_block_counter = profile->get_counter("SpillWriteBlockCount");
- _write_block_bytes_counter =
profile->get_counter("SpillWriteBlockDataSize");
- _write_file_total_size =
profile->get_counter("SpillWriteFileTotalSize");
- _write_file_current_size =
profile->get_counter("SpillWriteFileCurrentSize");
+ _write_block_bytes_counter =
profile->get_counter("SpillWriteBlockBytes");
+ _write_file_total_size = profile->get_counter("SpillWriteFileBytes");
+ _write_file_current_size =
profile->get_counter("SpillWriteFileCurrentBytes");
_write_rows_counter = profile->get_counter("SpillWriteRows");
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
index 55a8b00d2e8..7b8c8062243 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
@@ -113,6 +113,14 @@ public class AuditEvent {
public long scanBytesFromLocalStorage = -1;
@AuditField(value = "scanBytesFromRemoteStorage")
public long scanBytesFromRemoteStorage = -1;
+ @AuditField(value = "SpillWriteBlockBytes")
+ public long spillWriteBlockBytes = -1;
+ @AuditField(value = "SpillWriteFileBytes")
+ public long spillWriteFileBytes = -1;
+ @AuditField(value = "SpillReadBlockBytes")
+ public long spillReadBlockBytes = -1;
+ @AuditField(value = "SpillReadFileBytes")
+ public long spillReadFileBytes = -1;
public long pushToAuditLogQueueTime;
@@ -272,6 +280,26 @@ public class AuditEvent {
return this;
}
+ public AuditEventBuilder setSpillWriteBlockBytes(long
spillWriteBlockBytes) {
+ auditEvent.spillWriteBlockBytes = spillWriteBlockBytes;
+ return this;
+ }
+
+ public AuditEventBuilder setSpillWriteFileBytes(long
spillWriteFileBytes) {
+ auditEvent.spillWriteFileBytes = spillWriteFileBytes;
+ return this;
+ }
+
+ public AuditEventBuilder setSpillReadBlockBytes(long
spillReadBlockBytes) {
+ auditEvent.spillReadBlockBytes = spillReadBlockBytes;
+ return this;
+ }
+
+ public AuditEventBuilder setSpillReadFileBytes(long
spillReadFileBytes) {
+ auditEvent.spillReadFileBytes = spillReadFileBytes;
+ return this;
+ }
+
public AuditEvent build() {
return this.auditEvent;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
index c9016940c0d..8ee32ce8e60 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
@@ -201,6 +201,10 @@ public class AuditLogHelper {
.setQueryTime(elapseMs)
.setScanBytes(statistics == null ? 0 :
statistics.getScanBytes())
.setScanRows(statistics == null ? 0 : statistics.getScanRows())
+ .setSpillWriteBlockBytes(statistics == null ? 0 :
statistics.getSpillWriteBlockBytes())
+ .setSpillWriteFileBytes(statistics == null ? 0 :
statistics.getSpillWriteFileBytes())
+ .setSpillReadBlockBytes(statistics == null ? 0 :
statistics.getSpillReadBlockBytes())
+ .setSpillReadFileBytes(statistics == null ? 0 :
statistics.getSpillReadFileBytes())
.setCpuTimeMs(statistics == null ? 0 : statistics.getCpuMs())
.setPeakMemoryBytes(statistics == null ? 0 :
statistics.getMaxPeakMemoryBytes())
.setReturnRows(ctx.getReturnRows())
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
index 695bf983dc6..796a268706d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
@@ -86,6 +86,10 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon {
auditEvent.cpuTimeMs = queryStats.cpu_ms;
auditEvent.shuffleSendBytes =
queryStats.shuffle_send_bytes;
auditEvent.shuffleSendRows = queryStats.shuffle_send_rows;
+ auditEvent.spillWriteBlockBytes =
queryStats.spill_write_block_bytes;
+ auditEvent.spillWriteFileBytes =
queryStats.spill_write_file_bytes;
+ auditEvent.spillReadBlockBytes =
queryStats.spill_read_block_bytes;
+ auditEvent.spillReadFileBytes =
queryStats.spill_read_file_bytes;
}
boolean ret =
Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent, true);
if (!ret) {
@@ -226,6 +230,10 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon
{
if (dst.max_peak_memory_bytes < src.max_peak_memory_bytes) {
dst.max_peak_memory_bytes = src.max_peak_memory_bytes;
}
+ dst.spill_write_block_bytes += src.spill_write_block_bytes;
+ dst.spill_write_file_bytes += src.spill_write_file_bytes;
+ dst.spill_read_block_bytes += src.spill_read_block_bytes;
+ dst.spill_read_file_bytes += src.spill_read_file_bytes;
}
private void queryAuditEventLogWriteLock() {
diff --git a/gensrc/proto/data.proto b/gensrc/proto/data.proto
index 9b3824db3dc..0372634499c 100644
--- a/gensrc/proto/data.proto
+++ b/gensrc/proto/data.proto
@@ -37,6 +37,10 @@ message PQueryStatistics {
repeated PNodeStatistics nodes_statistics = 6;
optional int64 scan_bytes_from_local_storage = 7;
optional int64 scan_bytes_from_remote_storage = 8;
+ optional int64 spill_write_block_bytes = 9;
+ optional int64 spill_write_file_bytes = 10;
+ optional int64 spill_read_block_bytes = 11;
+ optional int64 spill_read_file_bytes = 12;
}
message PRowBatch {
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 004cf3b050c..cfcfdba5e73 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -413,6 +413,10 @@ struct TQueryStatistics {
9: optional i64 shuffle_send_rows
10: optional i64 scan_bytes_from_local_storage
11: optional i64 scan_bytes_from_remote_storage
+ 12: optional i64 spill_write_block_bytes
+ 13: optional i64 spill_write_file_bytes
+ 14: optional i64 spill_read_block_bytes
+ 15: optional i64 spill_read_file_bytes
}
struct TReportWorkloadRuntimeStatusParams {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]