This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new da1cd324ae0 (cloud-merge) Add file cache stats for queries to audit
log (#34140)
da1cd324ae0 is described below
commit da1cd324ae0d74b3241d5893a857a91bb578a70b
Author: Lightman <[email protected]>
AuthorDate: Sat Apr 27 13:54:41 2024 +0800
(cloud-merge) Add file cache stats for queries to audit log (#34140)
---
be/src/runtime/query_statistics.cpp | 10 ++++++++++
be/src/runtime/query_statistics.h | 12 ++++++++++++
be/src/vec/exec/scan/new_olap_scanner.cpp | 6 ++++++
be/test/vec/olap/vertical_compaction_test.cpp | 4 ----
.../java/org/apache/doris/plugin/audit/AuditEvent.java | 14 ++++++++++++++
.../src/main/java/org/apache/doris/qe/AuditLogHelper.java | 6 +++++-
.../workloadschedpolicy/WorkloadRuntimeStatusMgr.java | 4 ++++
gensrc/proto/data.proto | 2 ++
gensrc/thrift/FrontendService.thrift | 2 ++
9 files changed, 55 insertions(+), 5 deletions(-)
diff --git a/be/src/runtime/query_statistics.cpp
b/be/src/runtime/query_statistics.cpp
index de950704180..551f518f22f 100644
--- a/be/src/runtime/query_statistics.cpp
+++ b/be/src/runtime/query_statistics.cpp
@@ -32,6 +32,10 @@ void QueryStatistics::merge(const QueryStatistics& other) {
cpu_nanos += other.cpu_nanos.load(std::memory_order_relaxed);
shuffle_send_bytes +=
other.shuffle_send_bytes.load(std::memory_order_relaxed);
shuffle_send_rows +=
other.shuffle_send_rows.load(std::memory_order_relaxed);
+ _scan_bytes_from_local_storage +=
+
other._scan_bytes_from_local_storage.load(std::memory_order_relaxed);
+ _scan_bytes_from_remote_storage +=
+
other._scan_bytes_from_remote_storage.load(std::memory_order_relaxed);
int64_t other_peak_mem =
other.max_peak_memory_bytes.load(std::memory_order_relaxed);
if (other_peak_mem > this->max_peak_memory_bytes) {
@@ -51,6 +55,8 @@ void QueryStatistics::to_pb(PQueryStatistics* statistics) {
statistics->set_cpu_ms(cpu_nanos / NANOS_PER_MILLIS);
statistics->set_returned_rows(returned_rows);
statistics->set_max_peak_memory_bytes(max_peak_memory_bytes);
+
statistics->set_scan_bytes_from_remote_storage(_scan_bytes_from_remote_storage);
+
statistics->set_scan_bytes_from_local_storage(_scan_bytes_from_local_storage);
}
void QueryStatistics::to_thrift(TQueryStatistics* statistics) const {
@@ -64,12 +70,16 @@ void QueryStatistics::to_thrift(TQueryStatistics*
statistics) const {
current_used_memory_bytes.load(std::memory_order_relaxed));
statistics->__set_shuffle_send_bytes(shuffle_send_bytes.load(std::memory_order_relaxed));
statistics->__set_shuffle_send_rows(shuffle_send_rows.load(std::memory_order_relaxed));
+
statistics->__set_scan_bytes_from_remote_storage(_scan_bytes_from_remote_storage);
+
statistics->__set_scan_bytes_from_local_storage(_scan_bytes_from_local_storage);
}
void QueryStatistics::from_pb(const PQueryStatistics& statistics) {
scan_rows = statistics.scan_rows();
scan_bytes = statistics.scan_bytes();
cpu_nanos = statistics.cpu_ms() * NANOS_PER_MILLIS;
+ _scan_bytes_from_local_storage =
statistics.scan_bytes_from_local_storage();
+ _scan_bytes_from_remote_storage =
statistics.scan_bytes_from_remote_storage();
}
void QueryStatistics::merge(QueryStatisticsRecvr* recvr) {
diff --git a/be/src/runtime/query_statistics.h
b/be/src/runtime/query_statistics.h
index a9f6e192ec0..0a1c5c9f7ba 100644
--- a/be/src/runtime/query_statistics.h
+++ b/be/src/runtime/query_statistics.h
@@ -73,6 +73,14 @@ public:
this->shuffle_send_rows.fetch_add(delta_rows,
std::memory_order_relaxed);
}
+ void add_scan_bytes_from_local_storage(int64_t
scan_bytes_from_local_storage) {
+ this->_scan_bytes_from_local_storage += scan_bytes_from_local_storage;
+ }
+
+ void add_scan_bytes_from_remote_storage(int64_t
scan_bytes_from_remote_storage) {
+ this->_scan_bytes_from_remote_storage +=
scan_bytes_from_remote_storage;
+ }
+
void set_returned_rows(int64_t num_rows) { this->returned_rows = num_rows;
}
void set_max_peak_memory_bytes(int64_t max_peak_memory_bytes) {
@@ -95,6 +103,8 @@ public:
cpu_nanos.store(0, std::memory_order_relaxed);
shuffle_send_bytes.store(0, std::memory_order_relaxed);
shuffle_send_rows.store(0, std::memory_order_relaxed);
+ _scan_bytes_from_local_storage.store(0);
+ _scan_bytes_from_remote_storage.store(0);
returned_rows = 0;
max_peak_memory_bytes.store(0, std::memory_order_relaxed);
@@ -120,6 +130,8 @@ private:
std::atomic<int64_t> scan_rows;
std::atomic<int64_t> scan_bytes;
std::atomic<int64_t> cpu_nanos;
+ std::atomic<int64_t> _scan_bytes_from_local_storage;
+ std::atomic<int64_t> _scan_bytes_from_remote_storage;
// number rows returned by query.
// only set once by result sink when closing.
int64_t returned_rows;
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index c0bef6b3d8a..f65f814f569 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -688,6 +688,12 @@ void NewOlapScanner::_collect_profile_before_close() {
tablet->query_scan_bytes->increment(_compressed_bytes_read);
tablet->query_scan_rows->increment(_raw_rows_read);
tablet->query_scan_count->increment(1);
+ if (_query_statistics) {
+ _query_statistics->add_scan_bytes_from_local_storage(
+ stats.file_cache_stats.bytes_read_from_local);
+ _query_statistics->add_scan_bytes_from_remote_storage(
+ stats.file_cache_stats.bytes_read_from_remote);
+ }
}
} // namespace doris::vectorized
diff --git a/be/test/vec/olap/vertical_compaction_test.cpp
b/be/test/vec/olap/vertical_compaction_test.cpp
index a98f9c2944c..a4feb3db535 100644
--- a/be/test/vec/olap/vertical_compaction_test.cpp
+++ b/be/test/vec/olap/vertical_compaction_test.cpp
@@ -97,10 +97,6 @@ protected:
auto engine = std::make_unique<StorageEngine>(options);
engine_ref = engine.get();
ExecEnv::GetInstance()->set_storage_engine(std::move(engine));
- io::FileCacheSettings cache_setting;
- ASSERT_TRUE(io::FileCacheFactory::instance()
- ->create_file_cache(absolute_dir + "/tablet_path",
cache_setting)
- .ok());
_data_dir = new DataDir(*engine_ref, absolute_dir, 100000000);
static_cast<void>(_data_dir->init());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java
index 6a5fe19fcc6..ac68b38e258 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java
@@ -107,6 +107,10 @@ public class AuditEvent {
// note: newly added fields should be always before fuzzyVariables
@AuditField(value = "FuzzyVariables")
public String fuzzyVariables = "";
+ @AuditField(value = "scanBytesFromLocalStorage")
+ public long scanBytesFromLocalStorage = -1;
+ @AuditField(value = "scanBytesFromRemoteStorage")
+ public long scanBytesFromRemoteStorage = -1;
public long pushToAuditLogQueueTime;
@@ -251,6 +255,16 @@ public class AuditEvent {
return this;
}
+ public AuditEventBuilder setScanBytesFromLocalStorage(long
scanBytesFromLocalStorage) {
+ auditEvent.scanBytesFromLocalStorage = scanBytesFromLocalStorage;
+ return this;
+ }
+
+ public AuditEventBuilder setScanBytesFromRemoteStorage(long
scanBytesFromRemoteStorage) {
+ auditEvent.scanBytesFromRemoteStorage = scanBytesFromRemoteStorage;
+ return this;
+ }
+
public AuditEvent build() {
return this.auditEvent;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
index 17a0abe3468..edad7780512 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
@@ -111,7 +111,11 @@ public class AuditLogHelper {
auditEventBuilder.setSqlDigest(sqlDigest);
}
}
- auditEventBuilder.setIsQuery(true);
+ auditEventBuilder.setIsQuery(true)
+ .setScanBytesFromLocalStorage(
+ statistics == null ? 0 :
statistics.getScanBytesFromLocalStorage())
+ .setScanBytesFromRemoteStorage(
+ statistics == null ? 0 :
statistics.getScanBytesFromRemoteStorage());
} else {
auditEventBuilder.setIsQuery(false);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
index ff2641d5f3c..983643ec49b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
@@ -68,6 +68,8 @@ public class WorkloadRuntimeStatusMgr {
if (queryStats != null) {
auditEvent.scanRows = queryStats.scan_rows;
auditEvent.scanBytes = queryStats.scan_bytes;
+ auditEvent.scanBytesFromLocalStorage =
queryStats.scan_bytes_from_local_storage;
+ auditEvent.scanBytesFromRemoteStorage =
queryStats.scan_bytes_from_remote_storage;
auditEvent.peakMemoryBytes =
queryStats.max_peak_memory_bytes;
auditEvent.cpuTimeMs = queryStats.cpu_ms;
auditEvent.shuffleSendBytes =
queryStats.shuffle_send_bytes;
@@ -176,6 +178,8 @@ public class WorkloadRuntimeStatusMgr {
private void mergeQueryStatistics(TQueryStatistics dst, TQueryStatistics
src) {
dst.scan_rows += src.scan_rows;
dst.scan_bytes += src.scan_bytes;
+ dst.scan_bytes_from_local_storage += src.scan_bytes_from_local_storage;
+ dst.scan_bytes_from_remote_storage +=
src.scan_bytes_from_remote_storage;
dst.cpu_ms += src.cpu_ms;
dst.shuffle_send_bytes += src.shuffle_send_bytes;
dst.shuffle_send_rows += src.shuffle_send_rows;
diff --git a/gensrc/proto/data.proto b/gensrc/proto/data.proto
index e9ced523912..755a3a042db 100644
--- a/gensrc/proto/data.proto
+++ b/gensrc/proto/data.proto
@@ -35,6 +35,8 @@ message PQueryStatistics {
optional int64 cpu_ms = 4;
optional int64 max_peak_memory_bytes = 5;
repeated PNodeStatistics nodes_statistics = 6;
+ optional int64 scan_bytes_from_local_storage = 7;
+ optional int64 scan_bytes_from_remote_storage = 8;
}
message PRowBatch {
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 6ed7c23ec3c..23cc6c19e7b 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -409,6 +409,8 @@ struct TQueryStatistics {
7: optional i64 workload_group_id
8: optional i64 shuffle_send_bytes
9: optional i64 shuffle_send_rows
+ 10: optional i64 scan_bytes_from_local_storage
+ 11: optional i64 scan_bytes_from_remote_storage
}
struct TReportWorkloadRuntimeStatusParams {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]