This is an automated email from the ASF dual-hosted git repository.
lichaoyong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1cc78fe [Enhancement] Convert metric to Json format (#3635)
1cc78fe is described below
commit 1cc78fe69b3b07487d97592de78eefdf140bde83
Author: lichaoyong <[email protected]>
AuthorDate: Wed May 27 08:49:30 2020 +0800
[Enhancement] Convert metric to Json format (#3635)
Add a JSON format for existing metrics like this.
```
{
"tags":
{
"metric":"thread_pool",
"name":"thrift-server-pool",
"type":"active_thread_num"
},
"unit":"number",
"value":3
}
```
I add a new JsonMetricVisitor to handle the transformation.
It's not to modify existing PrometheusMetricVisitor and
SimpleCoreMetricVisitor.
Also I add
1. A unit item to indicate the metric better
2. Cloning tablet statistics divided by database.
3. Use white space to replace newline in audit.log
---
be/src/exec/tablet_sink.cpp | 3 +
be/src/http/action/metrics_action.cpp | 80 ++++++-
be/src/http/action/stream_load.cpp | 28 +--
be/src/runtime/client_cache.cpp | 4 +-
be/src/runtime/memory/chunk_allocator.cpp | 12 +-
be/src/runtime/runtime_state.h | 14 ++
be/src/runtime/tmp_file_mgr.cc | 2 +-
be/src/util/doris_metrics.cpp | 13 +-
be/src/util/doris_metrics.h | 264 +++++++++++----------
be/src/util/metrics.cpp | 26 +-
be/src/util/metrics.h | 92 +++++--
be/src/util/runtime_profile.cpp | 2 +-
be/src/util/system_metrics.cpp | 55 +++--
be/src/util/thrift_server.cpp | 4 +-
be/test/http/metrics_action_test.cpp | 8 +-
be/test/util/doris_metrics_test.cpp | 54 ++---
be/test/util/new_metrics_test.cpp | 30 +--
be/test/util/system_metrics_test.cpp | 58 ++---
.../java/org/apache/doris/analysis/InsertStmt.java | 2 +
.../org/apache/doris/common/ThreadPoolManager.java | 3 +-
.../common/proc/IncompleteTabletsProcNode.java | 11 +-
.../apache/doris/common/proc/StatisticProcDir.java | 18 +-
.../org/apache/doris/http/rest/MetricsAction.java | 3 +
.../apache/doris/load/loadv2/BrokerLoadJob.java | 3 +
.../load/routineload/RoutineLoadTaskInfo.java | 2 +
.../org/apache/doris/master/ReportHandler.java | 3 +-
.../org/apache/doris/metric/CounterMetric.java | 4 +-
.../apache/doris/metric/DoubleCounterMetric.java | 4 +-
.../java/org/apache/doris/metric/GaugeMetric.java | 4 +-
.../org/apache/doris/metric/GaugeMetricImpl.java | 4 +-
.../org/apache/doris/metric/JsonMetricVisitor.java | 86 +++++++
.../org/apache/doris/metric/LongCounterMetric.java | 4 +-
.../main/java/org/apache/doris/metric/Metric.java | 20 +-
.../java/org/apache/doris/metric/MetricRepo.java | 58 ++---
.../org/apache/doris/metric/MetricVisitor.java | 2 +
.../doris/metric/PrometheusMetricVisitor.java | 8 +
.../doris/metric/SimpleCoreMetricVisitor.java | 10 +-
.../java/org/apache/doris/qe/ConnectProcessor.java | 4 +-
.../java/org/apache/doris/qe/StmtExecutor.java | 2 +
.../apache/doris/service/FrontendServiceImpl.java | 15 +-
.../java/org/apache/doris/task/AgentTaskQueue.java | 15 ++
.../doris/load/loadv2/BrokerLoadJobTest.java | 8 +
.../org/apache/doris/load/loadv2/LoadJobTest.java | 7 +
43 files changed, 711 insertions(+), 338 deletions(-)
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 4941bb3..7060a62 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -598,6 +598,9 @@ Status OlapTableSink::send(RuntimeState* state, RowBatch*
input_batch) {
// update incrementally so that FE can get the progress.
// the real 'num_rows_load_total' will be set when sink being closed.
state->update_num_rows_load_total(input_batch->num_rows());
+ state->update_num_bytes_load_total(input_batch->total_byte_size());
+
DorisMetrics::instance()->load_rows_total.increment(input_batch->num_rows());
+
DorisMetrics::instance()->load_bytes_total.increment(input_batch->total_byte_size());
RowBatch* batch = input_batch;
if (!_output_expr_ctxs.empty()) {
SCOPED_RAW_TIMER(&_convert_batch_ns);
diff --git a/be/src/http/action/metrics_action.cpp
b/be/src/http/action/metrics_action.cpp
index e37e2d1..b487820 100644
--- a/be/src/http/action/metrics_action.cpp
+++ b/be/src/http/action/metrics_action.cpp
@@ -17,6 +17,10 @@
#include "http/action/metrics_action.h"
+#include <rapidjson/rapidjson.h>
+#include <rapidjson/stringbuffer.h>
+#include <rapidjson/document.h>
+#include <rapidjson/writer.h>
#include <string>
#include "http/http_request.h"
@@ -36,7 +40,7 @@ public:
std::string to_string() const { return _ss.str(); }
private:
void _visit_simple_metric(
- const std::string& name, const MetricLabels& labels, SimpleMetric*
metric);
+ const std::string& name, const MetricLabels& labels, Metric* metric);
private:
std::stringstream _ss;
};
@@ -88,7 +92,7 @@ void PrometheusMetricsVisitor::visit(const std::string&
prefix,
case MetricType::COUNTER:
case MetricType::GAUGE:
for (auto& it : collector->metrics()) {
- _visit_simple_metric(metric_name, it.first, (SimpleMetric*)
it.second);
+ _visit_simple_metric(metric_name, it.first, (Metric*) it.second);
}
break;
default:
@@ -97,7 +101,7 @@ void PrometheusMetricsVisitor::visit(const std::string&
prefix,
}
void PrometheusMetricsVisitor::_visit_simple_metric(
- const std::string& name, const MetricLabels& labels, SimpleMetric*
metric) {
+ const std::string& name, const MetricLabels& labels, Metric* metric) {
_ss << name;
// labels
if (!labels.empty()) {
@@ -138,20 +142,80 @@ void SimpleCoreMetricsVisitor::visit(const std::string&
prefix,
}
for (auto& it : collector->metrics()) {
- _ss << metric_name << " LONG " << ((SimpleMetric*)
it.second)->to_string()
+ _ss << metric_name << " LONG " << ((Metric*) it.second)->to_string()
<< "\n";
}
}
+class JsonMetricsVisitor : public MetricsVisitor {
+public:
+ JsonMetricsVisitor() {
+ }
+ virtual ~JsonMetricsVisitor() {}
+ void visit(const std::string& prefix, const std::string& name,
+ MetricCollector* collector) override;
+ std::string to_string() {
+ rapidjson::StringBuffer strBuf;
+ rapidjson::Writer<rapidjson::StringBuffer> writer(strBuf);
+ doc.Accept(writer);
+ return strBuf.GetString();
+ }
+
+private:
+ rapidjson::Document doc{rapidjson::kArrayType};
+};
+
+void JsonMetricsVisitor::visit(const std::string& prefix,
+ const std::string& name,
+ MetricCollector* collector) {
+ if (collector->empty() || name.empty()) {
+ return;
+ }
+
+ rapidjson::Document::AllocatorType& allocator = doc.GetAllocator();
+ switch (collector->type()) {
+ case MetricType::COUNTER:
+ case MetricType::GAUGE:
+ for (auto& it : collector->metrics()) {
+ const MetricLabels& labels = it.first;
+ Metric* metric = reinterpret_cast<Metric*>(it.second);
+ rapidjson::Value metric_obj(rapidjson::kObjectType);
+ rapidjson::Value tag_obj(rapidjson::kObjectType);
+ tag_obj.AddMember("metric", rapidjson::Value(name.c_str(),
allocator), allocator);
+ // labels
+ if (!labels.empty()) {
+ for (auto& label : labels.labels) {
+ tag_obj.AddMember(
+ rapidjson::Value(label.name.c_str(),
allocator),
+ rapidjson::Value(label.value.c_str(),
allocator),
+ allocator);
+ }
+ }
+ metric_obj.AddMember("tags", tag_obj, allocator);
+ rapidjson::Value unit_val(unit_name(metric->unit()), allocator);
+ metric_obj.AddMember("unit", unit_val, allocator);
+ metric->write_value(metric_obj, allocator);
+ doc.PushBack(metric_obj, allocator);
+ }
+ break;
+ default:
+ break;
+ }
+}
+
void MetricsAction::handle(HttpRequest* req) {
const std::string& type = req->param("type");
std::string str;
- if (type != "core") {
- PrometheusMetricsVisitor visitor;
+ if (type == "core") {
+ SimpleCoreMetricsVisitor visitor;
+ _metrics->collect(&visitor);
+ str.assign(visitor.to_string());
+ } else if (type == "agent") {
+ JsonMetricsVisitor visitor;
_metrics->collect(&visitor);
str.assign(visitor.to_string());
} else {
- SimpleCoreMetricsVisitor visitor;
+ PrometheusMetricsVisitor visitor;
_metrics->collect(&visitor);
str.assign(visitor.to_string());
}
@@ -160,4 +224,4 @@ void MetricsAction::handle(HttpRequest* req) {
HttpChannel::send_reply(req, str);
}
-}
+} // namespace doris
diff --git a/be/src/http/action/stream_load.cpp
b/be/src/http/action/stream_load.cpp
index 7d490fe..15979ea 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -59,10 +59,10 @@
namespace doris {
-IntCounter k_streaming_load_requests_total;
-IntCounter k_streaming_load_bytes;
-IntCounter k_streaming_load_duration_ms;
-static IntGauge k_streaming_load_current_processing;
+METRIC_DEFINE_INT_COUNTER(streaming_load_requests_total, MetricUnit::NUMBER);
+METRIC_DEFINE_INT_COUNTER(streaming_load_bytes, MetricUnit::BYTES);
+METRIC_DEFINE_INT_COUNTER(streaming_load_duration_ms,
MetricUnit::MILLISECONDS);
+METRIC_DEFINE_INT_GAUGE(streaming_load_current_processing, MetricUnit::NUMBER);
#ifdef BE_TEST
TStreamLoadPutResult k_stream_load_put_result;
@@ -89,13 +89,13 @@ static bool
is_format_support_streaming(TFileFormatType::type format) {
StreamLoadAction::StreamLoadAction(ExecEnv* exec_env) : _exec_env(exec_env) {
DorisMetrics::instance()->metrics()->register_metric("streaming_load_requests_total",
- &k_streaming_load_requests_total);
+ &streaming_load_requests_total);
DorisMetrics::instance()->metrics()->register_metric("streaming_load_bytes",
- &k_streaming_load_bytes);
+ &streaming_load_bytes);
DorisMetrics::instance()->metrics()->register_metric("streaming_load_duration_ms",
- &k_streaming_load_duration_ms);
+ &streaming_load_duration_ms);
DorisMetrics::instance()->metrics()->register_metric("streaming_load_current_processing",
-
&k_streaming_load_current_processing);
+
&streaming_load_current_processing);
}
StreamLoadAction::~StreamLoadAction() {
@@ -131,10 +131,10 @@ void StreamLoadAction::handle(HttpRequest* req) {
HttpChannel::send_reply(req, str);
// update statstics
- k_streaming_load_requests_total.increment(1);
- k_streaming_load_duration_ms.increment(ctx->load_cost_nanos / 1000000);
- k_streaming_load_bytes.increment(ctx->receive_bytes);
- k_streaming_load_current_processing.increment(-1);
+ streaming_load_requests_total.increment(1);
+ streaming_load_duration_ms.increment(ctx->load_cost_nanos / 1000000);
+ streaming_load_bytes.increment(ctx->receive_bytes);
+ streaming_load_current_processing.increment(-1);
}
Status StreamLoadAction::_handle(StreamLoadContext* ctx) {
@@ -164,7 +164,7 @@ Status StreamLoadAction::_handle(StreamLoadContext* ctx) {
}
int StreamLoadAction::on_header(HttpRequest* req) {
- k_streaming_load_current_processing.increment(1);
+ streaming_load_current_processing.increment(1);
StreamLoadContext* ctx = new StreamLoadContext(_exec_env);
ctx->ref();
@@ -195,7 +195,7 @@ int StreamLoadAction::on_header(HttpRequest* req) {
}
auto str = ctx->to_json();
HttpChannel::send_reply(req, str);
- k_streaming_load_current_processing.increment(-1);
+ streaming_load_current_processing.increment(-1);
return -1;
}
return 0;
diff --git a/be/src/runtime/client_cache.cpp b/be/src/runtime/client_cache.cpp
index 3a46ab0..7144684 100644
--- a/be/src/runtime/client_cache.cpp
+++ b/be/src/runtime/client_cache.cpp
@@ -216,12 +216,12 @@ void ClientCacheHelper::init_metrics(MetricRegistry*
metrics, const std::string&
// usage, but ensures that _metrics_enabled is published.
boost::lock_guard<boost::mutex> lock(_lock);
- _used_clients.reset(new IntGauge());
+ _used_clients.reset(new IntGauge(MetricUnit::NUMBER));
metrics->register_metric("thrift_used_clients",
MetricLabels().add("name", key_prefix),
_used_clients.get());
- _opened_clients.reset(new IntGauge());
+ _opened_clients.reset(new IntGauge(MetricUnit::NUMBER));
metrics->register_metric("thrift_opened_clients",
MetricLabels().add("name", key_prefix),
_opened_clients.get());
diff --git a/be/src/runtime/memory/chunk_allocator.cpp
b/be/src/runtime/memory/chunk_allocator.cpp
index e331ef3..c5e68f2 100644
--- a/be/src/runtime/memory/chunk_allocator.cpp
+++ b/be/src/runtime/memory/chunk_allocator.cpp
@@ -34,12 +34,12 @@ namespace doris {
ChunkAllocator* ChunkAllocator::_s_instance = nullptr;
-static IntCounter local_core_alloc_count;
-static IntCounter other_core_alloc_count;
-static IntCounter system_alloc_count;
-static IntCounter system_free_count;
-static IntCounter system_alloc_cost_ns;
-static IntCounter system_free_cost_ns;
+static IntCounter local_core_alloc_count(MetricUnit::NUMBER);
+static IntCounter other_core_alloc_count(MetricUnit::NUMBER);
+static IntCounter system_alloc_count(MetricUnit::NUMBER);
+static IntCounter system_free_count(MetricUnit::NUMBER);
+static IntCounter system_alloc_cost_ns(MetricUnit::NANOSECONDS);
+static IntCounter system_free_cost_ns(MetricUnit::NANOSECONDS);
#ifdef BE_TEST
static std::mutex s_mutex;
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 876bcc3..c3c4873 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -389,6 +389,10 @@ public:
void append_error_msg_to_file(const std::string& line, const std::string&
error_msg,
bool is_summary = false);
+ int64_t num_bytes_load_total() {
+ return _num_bytes_load_total.load();
+ }
+
int64_t num_rows_load_total() {
return _num_rows_load_total.load();
}
@@ -413,6 +417,14 @@ public:
_num_rows_load_total.store(num_rows);
}
+ void update_num_bytes_load_total(int64_t bytes_load) {
+ _num_bytes_load_total.fetch_add(bytes_load);
+ }
+
+ void set_update_num_bytes_load_total(int64_t bytes_load) {
+ _num_bytes_load_total.store(bytes_load);
+ }
+
void update_num_rows_load_filtered(int64_t num_rows) {
_num_rows_load_filtered.fetch_add(num_rows);
}
@@ -587,6 +599,8 @@ private:
std::atomic<int64_t> _num_rows_load_unselected; // rows filtered by
predicates
std::atomic<int64_t> _num_print_error_rows;
+ std::atomic<int64_t> _num_bytes_load_total; // total bytes read from
source
+
std::vector<std::string> _export_output_files;
std::string _import_label;
diff --git a/be/src/runtime/tmp_file_mgr.cc b/be/src/runtime/tmp_file_mgr.cc
index 87ed68b..badc903 100644
--- a/be/src/runtime/tmp_file_mgr.cc
+++ b/be/src/runtime/tmp_file_mgr.cc
@@ -119,7 +119,7 @@ Status TmpFileMgr::init_custom(
}
DCHECK(metrics != NULL);
- _num_active_scratch_dirs_metric.reset(new IntGauge());
+ _num_active_scratch_dirs_metric.reset(new IntGauge(MetricUnit::NUMBER));
metrics->register_metric("active_scratch_dirs",
_num_active_scratch_dirs_metric.get());
//_active_scratch_dirs_metric = metrics->register_metric(new
SetMetric<std::string>(
// TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST,
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index 36c3e3b..2c466e9 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -150,6 +150,11 @@ DorisMetrics::DorisMetrics() : _name("doris_be"),
_hook_name("doris_metrics"), _
_metrics.register_metric(
"stream_load", MetricLabels().add("type", "load_rows"),
&stream_load_rows_total);
+ _metrics.register_metric(
+ "load", MetricLabels().add("type", "receive_bytes"),
+ &stream_receive_bytes_total);
+ _metrics.register_metric("load_rows", &load_rows_total);
+ _metrics.register_metric("load_bytes", &load_bytes_total);
// Gauge
REGISTER_DORIS_METRIC(memory_pool_bytes_total);
@@ -188,13 +193,13 @@ void DorisMetrics::initialize(
const std::vector<std::string>& network_interfaces) {
// disk usage
for (auto& path : paths) {
- IntGauge* gauge = disks_total_capacity.set_key(path);
+ IntGauge* gauge = disks_total_capacity.set_key(path,
MetricUnit::BYTES);
_metrics.register_metric("disks_total_capacity",
MetricLabels().add("path", path), gauge);
- gauge = disks_avail_capacity.set_key(path);
+ gauge = disks_avail_capacity.set_key(path, MetricUnit::BYTES);
_metrics.register_metric("disks_avail_capacity",
MetricLabels().add("path", path), gauge);
- gauge = disks_data_used_capacity.set_key(path);
+ gauge = disks_data_used_capacity.set_key(path, MetricUnit::BYTES);
_metrics.register_metric("disks_data_used_capacity",
MetricLabels().add("path", path), gauge);
- gauge = disks_state.set_key(path);
+ gauge = disks_state.set_key(path, MetricUnit::BYTES);
_metrics.register_metric("disks_state", MetricLabels().add("path",
path), gauge);
}
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 7a3c0bc..e6775b5 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -37,8 +37,8 @@ public:
}
}
- IntGauge* set_key(const std::string& key) {
- metrics.emplace(key, IntGauge());
+ IntGauge* set_key(const std::string& key, const MetricUnit unit) {
+ metrics.emplace(key, IntGauge(unit));
return &metrics.find(key)->second;
}
@@ -54,139 +54,141 @@ private:
class DorisMetrics {
public:
- // counters
- IntCounter fragment_requests_total;
- IntCounter fragment_request_duration_us;
- IntCounter http_requests_total;
- IntCounter http_request_duration_us;
- IntCounter http_request_send_bytes;
- IntCounter query_scan_bytes;
- IntCounter query_scan_rows;
- IntCounter ranges_processed_total;
- IntCounter push_requests_success_total;
- IntCounter push_requests_fail_total;
- IntCounter push_request_duration_us;
- IntCounter push_request_write_bytes;
- IntCounter push_request_write_rows;
- IntCounter create_tablet_requests_total;
- IntCounter create_tablet_requests_failed;
- IntCounter drop_tablet_requests_total;
-
- IntCounter report_all_tablets_requests_total;
- IntCounter report_all_tablets_requests_failed;
- IntCounter report_tablet_requests_total;
- IntCounter report_tablet_requests_failed;
- IntCounter report_disk_requests_total;
- IntCounter report_disk_requests_failed;
- IntCounter report_task_requests_total;
- IntCounter report_task_requests_failed;
-
- IntCounter schema_change_requests_total;
- IntCounter schema_change_requests_failed;
- IntCounter create_rollup_requests_total;
- IntCounter create_rollup_requests_failed;
- IntCounter storage_migrate_requests_total;
- IntCounter delete_requests_total;
- IntCounter delete_requests_failed;
- IntCounter clone_requests_total;
- IntCounter clone_requests_failed;
-
- IntCounter finish_task_requests_total;
- IntCounter finish_task_requests_failed;
-
- IntCounter base_compaction_request_total;
- IntCounter base_compaction_request_failed;
- IntCounter cumulative_compaction_request_total;
- IntCounter cumulative_compaction_request_failed;
-
- IntCounter base_compaction_deltas_total;
- IntCounter base_compaction_bytes_total;
- IntCounter cumulative_compaction_deltas_total;
- IntCounter cumulative_compaction_bytes_total;
-
- IntCounter publish_task_request_total;
- IntCounter publish_task_failed_total;
-
- IntCounter meta_write_request_total;
- IntCounter meta_write_request_duration_us;
- IntCounter meta_read_request_total;
- IntCounter meta_read_request_duration_us;
-
- // Counters for segment_v2
- // -----------------------
- // total number of segments read
- IntCounter segment_read_total;
- // total number of rows in queried segments (before index pruning)
- IntCounter segment_row_total;
- // total number of rows selected by short key index
- IntCounter segment_rows_by_short_key;
- // total number of rows selected by zone map index
- IntCounter segment_rows_read_by_zone_map;
-
- IntCounter txn_begin_request_total;
- IntCounter txn_commit_request_total;
- IntCounter txn_rollback_request_total;
- IntCounter txn_exec_plan_total;
- IntCounter stream_receive_bytes_total;
- IntCounter stream_load_rows_total;
-
- IntCounter memtable_flush_total;
- IntCounter memtable_flush_duration_us;
-
- // Gauges
- IntGauge memory_pool_bytes_total;
- IntGauge process_thread_num;
- IntGauge process_fd_num_used;
- IntGauge process_fd_num_limit_soft;
- IntGauge process_fd_num_limit_hard;
+ // counters
+ METRIC_DEFINE_INT_COUNTER(fragment_requests_total, MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(fragment_request_duration_us,
MetricUnit::MICROSECONDS);
+ METRIC_DEFINE_INT_COUNTER(http_requests_total, MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(http_request_duration_us,
MetricUnit::MICROSECONDS);
+ METRIC_DEFINE_INT_COUNTER(http_request_send_bytes, MetricUnit::BYTES);
+ METRIC_DEFINE_INT_COUNTER(query_scan_bytes, MetricUnit::BYTES);
+ METRIC_DEFINE_INT_COUNTER(query_scan_rows, MetricUnit::BYTES);
+ METRIC_DEFINE_INT_COUNTER(ranges_processed_total, MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(push_requests_success_total,
MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(push_requests_fail_total, MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(push_request_duration_us,
MetricUnit::MICROSECONDS);
+ METRIC_DEFINE_INT_COUNTER(push_request_write_bytes, MetricUnit::BYTES);
+ METRIC_DEFINE_INT_COUNTER(push_request_write_rows, MetricUnit::ROWS);
+ METRIC_DEFINE_INT_COUNTER(create_tablet_requests_total,
MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(create_tablet_requests_failed,
MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(drop_tablet_requests_total,
MetricUnit::NUMBER);
+
+ METRIC_DEFINE_INT_COUNTER(report_all_tablets_requests_total,
MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(report_all_tablets_requests_failed,
MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(report_tablet_requests_total,
MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(report_tablet_requests_failed,
MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(report_disk_requests_total,
MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(report_disk_requests_failed,
MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(report_task_requests_total,
MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(report_task_requests_failed,
MetricUnit::NUMBER);
+
+ METRIC_DEFINE_INT_COUNTER(schema_change_requests_total,
MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(schema_change_requests_failed,
MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(create_rollup_requests_total,
MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(create_rollup_requests_failed,
MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(storage_migrate_requests_total,
MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(delete_requests_total, MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(delete_requests_failed, MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(clone_requests_total, MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(clone_requests_failed, MetricUnit::NUMBER);
+
+ METRIC_DEFINE_INT_COUNTER(finish_task_requests_total,
MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(finish_task_requests_failed,
MetricUnit::NUMBER);
+
+ METRIC_DEFINE_INT_COUNTER(base_compaction_request_total,
MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(base_compaction_request_failed,
MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(cumulative_compaction_request_total,
MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(cumulative_compaction_request_failed,
MetricUnit::NUMBER);
+
+ METRIC_DEFINE_INT_COUNTER(base_compaction_deltas_total,
MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(base_compaction_bytes_total,
MetricUnit::BYTES);
+ METRIC_DEFINE_INT_COUNTER(cumulative_compaction_deltas_total,
MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(cumulative_compaction_bytes_total,
MetricUnit::BYTES);
+
+ METRIC_DEFINE_INT_COUNTER(publish_task_request_total,
MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(publish_task_failed_total,
MetricUnit::NUMBER);
+
+ METRIC_DEFINE_INT_COUNTER(meta_write_request_total, MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(meta_write_request_duration_us,
MetricUnit::MICROSECONDS);
+ METRIC_DEFINE_INT_COUNTER(meta_read_request_total, MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(meta_read_request_duration_us,
MetricUnit::MICROSECONDS);
+
+ // Counters for segment_v2
+ // -----------------------
+ // total number of segments read
+ METRIC_DEFINE_INT_COUNTER(segment_read_total, MetricUnit::NUMBER);
+ // total number of rows in queried segments (before index pruning)
+ METRIC_DEFINE_INT_COUNTER(segment_row_total, MetricUnit::ROWS);
+ // total number of rows selected by short key index
+ METRIC_DEFINE_INT_COUNTER(segment_rows_by_short_key, MetricUnit::ROWS);
+ // total number of rows selected by zone map index
+ METRIC_DEFINE_INT_COUNTER(segment_rows_read_by_zone_map,
MetricUnit::ROWS);
+
+ METRIC_DEFINE_INT_COUNTER(txn_begin_request_total, MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(txn_commit_request_total, MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(txn_rollback_request_total,
MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(txn_exec_plan_total, MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(stream_receive_bytes_total,
MetricUnit::BYTES);
+ METRIC_DEFINE_INT_COUNTER(stream_load_rows_total, MetricUnit::ROWS);
+ METRIC_DEFINE_INT_COUNTER(load_rows_total, MetricUnit::ROWS);
+ METRIC_DEFINE_INT_COUNTER(load_bytes_total, MetricUnit::BYTES);
+
+ METRIC_DEFINE_INT_COUNTER(memtable_flush_total, MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(memtable_flush_duration_us,
MetricUnit::MICROSECONDS);
+
+ // Gauges
+ METRIC_DEFINE_INT_GAUGE(memory_pool_bytes_total, MetricUnit::BYTES);
+ METRIC_DEFINE_INT_GAUGE(process_thread_num, MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_GAUGE(process_fd_num_used, MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_GAUGE(process_fd_num_limit_soft, MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_GAUGE(process_fd_num_limit_hard, MetricUnit::NUMBER);
IntGaugeMetricsMap disks_total_capacity;
IntGaugeMetricsMap disks_avail_capacity;
IntGaugeMetricsMap disks_data_used_capacity;
IntGaugeMetricsMap disks_state;
-
- // the max compaction score of all tablets.
- // Record base and cumulative scores separately, because
- // we need to get the larger of the two.
- IntGauge tablet_cumulative_max_compaction_score;
- IntGauge tablet_base_max_compaction_score;
-
- // The following metrics will be calculated
- // by metric calculator
- IntGauge push_request_write_bytes_per_second;
- IntGauge query_scan_bytes_per_second;
- IntGauge max_disk_io_util_percent;
- IntGauge max_network_send_bytes_rate;
- IntGauge max_network_receive_bytes_rate;
-
- // Metrics related with BlockManager
- IntCounter readable_blocks_total;
- IntCounter writable_blocks_total;
- IntCounter blocks_created_total;
- IntCounter blocks_deleted_total;
- IntCounter bytes_read_total;
- IntCounter bytes_written_total;
- IntCounter disk_sync_total;
- IntGauge blocks_open_reading;
- IntGauge blocks_open_writing;
-
- IntCounter blocks_push_remote_duration_us;
-
- // Size of some global containers
- UIntGauge rowset_count_generated_and_in_use;
- UIntGauge unused_rowsets_count;
- UIntGauge broker_count;
- UIntGauge data_stream_receiver_count;
- UIntGauge fragment_endpoint_count;
- UIntGauge active_scan_context_count;
- UIntGauge plan_fragment_count;
- UIntGauge load_channel_count;
- UIntGauge result_buffer_block_count;
- UIntGauge result_block_queue_count;
- UIntGauge routine_load_task_count;
- UIntGauge small_file_cache_count;
- UIntGauge stream_load_pipe_count;
- UIntGauge brpc_endpoint_stub_count;
- UIntGauge tablet_writer_count;
+
+ // the max compaction score of all tablets.
+ // Record base and cumulative scores separately, because
+ // we need to get the larger of the two.
+ METRIC_DEFINE_INT_GAUGE(tablet_cumulative_max_compaction_score,
MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_GAUGE(tablet_base_max_compaction_score,
MetricUnit::NUMBER);
+
+ // The following metrics will be calculated
+ // by metric calculator
+ METRIC_DEFINE_INT_GAUGE(push_request_write_bytes_per_second,
MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_GAUGE(query_scan_bytes_per_second,
MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_GAUGE(max_disk_io_util_percent, MetricUnit::PERCENT);
+ METRIC_DEFINE_INT_GAUGE(max_network_send_bytes_rate,
MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_GAUGE(max_network_receive_bytes_rate,
MetricUnit::NUMBER);
+
+ // Metrics related with BlockManager
+ METRIC_DEFINE_INT_COUNTER(readable_blocks_total, MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(writable_blocks_total, MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(blocks_created_total, MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(blocks_deleted_total, MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_COUNTER(bytes_read_total, MetricUnit::BYTES);
+ METRIC_DEFINE_INT_COUNTER(bytes_written_total, MetricUnit::BYTES);
+ METRIC_DEFINE_INT_COUNTER(disk_sync_total, MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_GAUGE(blocks_open_reading, MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_GAUGE(blocks_open_writing, MetricUnit::NUMBER);
+
+ METRIC_DEFINE_INT_COUNTER(blocks_push_remote_duration_us,
MetricUnit::MICROSECONDS);
+
+ // Size of some global containers
+ METRIC_DEFINE_UINT_GAUGE(rowset_count_generated_and_in_use,
MetricUnit::NUMBER);
+ METRIC_DEFINE_UINT_GAUGE(unused_rowsets_count, MetricUnit::NUMBER);
+ METRIC_DEFINE_UINT_GAUGE(broker_count, MetricUnit::NUMBER);
+ METRIC_DEFINE_UINT_GAUGE(data_stream_receiver_count,
MetricUnit::NUMBER);
+ METRIC_DEFINE_UINT_GAUGE(fragment_endpoint_count, MetricUnit::NUMBER);
+ METRIC_DEFINE_UINT_GAUGE(active_scan_context_count, MetricUnit::NUMBER);
+ METRIC_DEFINE_UINT_GAUGE(plan_fragment_count, MetricUnit::NUMBER);
+ METRIC_DEFINE_UINT_GAUGE(load_channel_count, MetricUnit::NUMBER);
+ METRIC_DEFINE_UINT_GAUGE(result_buffer_block_count, MetricUnit::NUMBER);
+ METRIC_DEFINE_UINT_GAUGE(result_block_queue_count, MetricUnit::NUMBER);
+ METRIC_DEFINE_UINT_GAUGE(routine_load_task_count, MetricUnit::NUMBER);
+ METRIC_DEFINE_UINT_GAUGE(small_file_cache_count, MetricUnit::NUMBER);
+ METRIC_DEFINE_UINT_GAUGE(stream_load_pipe_count, MetricUnit::NUMBER);
+ METRIC_DEFINE_UINT_GAUGE(brpc_endpoint_stub_count, MetricUnit::NUMBER);
+ METRIC_DEFINE_UINT_GAUGE(tablet_writer_count, MetricUnit::NUMBER);
static DorisMetrics* instance() {
static DorisMetrics instance;
diff --git a/be/src/util/metrics.cpp b/be/src/util/metrics.cpp
index 3b37353..cbd5900 100644
--- a/be/src/util/metrics.cpp
+++ b/be/src/util/metrics.cpp
@@ -45,6 +45,29 @@ std::ostream& operator<<(std::ostream& os, MetricType type) {
return os;
}
+const char* unit_name(MetricUnit unit) {
+ switch (unit) {
+ case MetricUnit::NANOSECONDS:
+ return "nanoseconds";
+ case MetricUnit::MICROSECONDS:
+ return "microseconds";
+ case MetricUnit::MILLISECONDS:
+ return "milliseconds";
+ case MetricUnit::SECONDS:
+ return "seconds";
+ case MetricUnit::BYTES:
+ return "bytes";
+ case MetricUnit::ROWS:
+ return "rows";
+ case MetricUnit::NUMBER:
+ return "number";
+ case MetricUnit::PERCENT:
+ return "percent";
+ default:
+ return "nounit";
+ }
+}
+
void Metric::hide() {
if (_registry == nullptr) {
return;
@@ -56,8 +79,9 @@ void Metric::hide() {
bool MetricCollector::add_metic(const MetricLabels& labels, Metric* metric) {
if (empty()) {
_type = metric->type();
+ _unit = metric->unit();
} else {
- if (metric->type() != _type) {
+ if (metric->type() != _type || metric->unit() != _unit) {
return false;
}
}
diff --git a/be/src/util/metrics.h b/be/src/util/metrics.h
index 331c5c3..46450c4 100644
--- a/be/src/util/metrics.h
+++ b/be/src/util/metrics.h
@@ -26,12 +26,17 @@
#include <mutex>
#include <iomanip>
+#include <rapidjson/rapidjson.h>
+#include <rapidjson/document.h>
+
#include "common/config.h"
#include "util/spinlock.h"
#include "util/core_local.h"
namespace doris {
+namespace rj = RAPIDJSON_NAMESPACE;
+
class MetricRegistry;
enum class MetricType {
@@ -42,33 +47,49 @@ enum class MetricType {
UNTYPED
};
+enum class MetricUnit {
+ NANOSECONDS,
+ MICROSECONDS,
+ MILLISECONDS,
+ SECONDS,
+ BYTES,
+ ROWS,
+ NUMBER,
+ PERCENT,
+ NOUNIT
+};
+
std::ostream& operator<<(std::ostream& os, MetricType type);
+const char* unit_name(MetricUnit unit);
class Metric {
public:
- Metric(MetricType type) :_type(type), _registry(nullptr) { }
+ Metric(MetricType type, MetricUnit unit)
+ : _type(type),
+ _unit(unit),
+ _registry(nullptr) {}
virtual ~Metric() { hide(); }
+ virtual std::string to_string() const = 0;
MetricType type() const { return _type; }
+ MetricUnit unit() const { return _unit; }
void hide();
+ virtual void write_value(rj::Value& metric_obj,
+ rj::Document::AllocatorType& allocator) = 0;
private:
friend class MetricRegistry;
- MetricType _type;
+ MetricType _type = MetricType::UNTYPED;
+ MetricUnit _unit = MetricUnit::NOUNIT;
MetricRegistry* _registry;
};
-class SimpleMetric : public Metric {
-public:
- SimpleMetric(MetricType type) :Metric(type) { }
- virtual ~SimpleMetric() { }
- virtual std::string to_string() const = 0;
-};
-
// Metric that only can increment
template<typename T>
-class LockSimpleMetric : public SimpleMetric {
+class LockSimpleMetric : public Metric {
public:
- LockSimpleMetric(MetricType type) :SimpleMetric(type), _value(T()) { }
+ LockSimpleMetric(MetricType type, MetricUnit unit)
+ : Metric(type, unit),
+ _value(T()) {}
virtual ~LockSimpleMetric() { }
std::string to_string() const override {
@@ -76,6 +97,11 @@ public:
ss << value();
return ss.str();
}
+
+ void write_value(rj::Value& metric_obj,
+ rj::Document::AllocatorType& allocator) override {
+ metric_obj.AddMember("value", rj::Value(value()), allocator);
+ }
T value() const {
std::lock_guard<SpinLock> l(_lock);
@@ -103,9 +129,12 @@ protected:
};
template<typename T>
-class CoreLocalCounter : public SimpleMetric {
+class CoreLocalCounter : public Metric {
public:
- CoreLocalCounter() :SimpleMetric(MetricType::COUNTER), _value() { }
+ CoreLocalCounter(MetricUnit unit)
+ : Metric(MetricType::COUNTER, unit),
+ _value() {}
+
virtual ~CoreLocalCounter() { }
std::string to_string() const override {
@@ -113,6 +142,11 @@ public:
ss << value();
return ss.str();
}
+
+ void write_value(rj::Value& metric_obj,
+ rj::Document::AllocatorType& allocator) override {
+ metric_obj.AddMember("value", rj::Value(value()), allocator);
+ }
T value() const {
T sum = 0;
@@ -132,7 +166,8 @@ protected:
template<typename T>
class LockCounter : public LockSimpleMetric<T> {
public:
- LockCounter() :LockSimpleMetric<T>(MetricType::COUNTER) { }
+ LockCounter(MetricUnit unit)
+ : LockSimpleMetric<T>(MetricType::COUNTER, unit) {}
virtual ~LockCounter() { }
};
@@ -140,7 +175,8 @@ public:
template<typename T>
class LockGauge : public LockSimpleMetric<T> {
public:
- LockGauge() :LockSimpleMetric<T>(MetricType::GAUGE) { }
+ LockGauge(MetricUnit unit)
+ : LockSimpleMetric<T>(MetricType::GAUGE, unit) {}
virtual ~LockGauge() { }
};
@@ -274,8 +310,10 @@ public:
return _metrics;
}
MetricType type() const { return _type; }
+ MetricUnit unit() const { return _unit; }
private:
MetricType _type = MetricType::UNTYPED;
+ MetricUnit _unit = MetricUnit::NOUNIT;
std::map<MetricLabels, Metric*> _metrics;
};
@@ -343,4 +381,26 @@ using IntGauge = LockGauge<int64_t>;
using UIntGauge = LockGauge<uint64_t>;
using DoubleGauge = LockGauge<double>;
-}
+} // namespace doris
+
+// Convenience macros to metric
+#define METRIC_DEFINE_INT_COUNTER(metric_name, unit) \
+ doris::IntCounter metric_name{unit}
+
+#define METRIC_DEFINE_INT_LOCK_COUNTER(metric_name, unit) \
+ doris::IntLockCounter metric_name{unit}
+
+#define METRIC_DEFINE_UINT_COUNTER(metric_name, unit) \
+ doris::UIntCounter metric_name{unit}
+
+#define METRIC_DEFINE_DOUBLE_COUNTER(metric_name, unit) \
+ doris::DoubleCounter metric_name{unit}
+
+#define METRIC_DEFINE_INT_GAUGE(metric_name, unit) \
+ doris::IntGauge metric_name{unit}
+
+#define METRIC_DEFINE_UINT_GAUGE(metric_name, unit) \
+ doris::UIntGauge metric_name{unit}
+
+#define METRIC_DEFINE_DOUBLE_GAUGE(metric_name, unit) \
+ doris::DoubleGauge metric_name{unit}
diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp
index 7b69153..07f4658 100644
--- a/be/src/util/runtime_profile.cpp
+++ b/be/src/util/runtime_profile.cpp
@@ -529,7 +529,7 @@ void RuntimeProfile::pretty_print(std::ostream* s, const
std::string& prefix) co
{
boost::lock_guard<boost::mutex> l(_info_strings_lock);
BOOST_FOREACH (const std::string& key, _info_strings_display_order) {
- stream << prefix << " " << key << ": " <<
_info_strings.find(key)->second << std::endl;
+ stream << prefix << " - " << key << ": " <<
_info_strings.find(key)->second << std::endl;
}
}
diff --git a/be/src/util/system_metrics.cpp b/be/src/util/system_metrics.cpp
index ee43b20..26358e8 100644
--- a/be/src/util/system_metrics.cpp
+++ b/be/src/util/system_metrics.cpp
@@ -28,40 +28,47 @@ const char* SystemMetrics::_s_hook_name = "system_metrics";
// /proc/stat: http://www.linuxhowtos.org/System/procstat.htm
struct CpuMetrics {
- static constexpr int k_num_metrics = 10;
- static const char* k_names[k_num_metrics];
- IntLockCounter metrics[k_num_metrics];
+ static constexpr int cpu_num_metrics = 10;
+ IntLockCounter metrics[cpu_num_metrics] = {
+ {MetricUnit::PERCENT}, {MetricUnit::PERCENT},
+ {MetricUnit::PERCENT}, {MetricUnit::PERCENT},
+ {MetricUnit::PERCENT}, {MetricUnit::PERCENT},
+ {MetricUnit::PERCENT}, {MetricUnit::PERCENT},
+ {MetricUnit::PERCENT}, {MetricUnit::PERCENT}
+ };
+ static const char* cpu_metrics[cpu_num_metrics];
};
-const char* CpuMetrics::k_names[] = {
+const char* CpuMetrics::cpu_metrics[] = {
"user", "nice", "system", "idle", "iowait",
- "irq", "soft_irq", "steal", "guest", "guest_nice"};
+ "irq", "soft_irq", "steal", "guest", "guest_nice"
+};
struct MemoryMetrics {
- IntGauge allocated_bytes;
+ METRIC_DEFINE_INT_GAUGE(allocated_bytes, MetricUnit::BYTES);
};
struct DiskMetrics {
- IntLockCounter reads_completed;
- IntLockCounter bytes_read;
- IntLockCounter read_time_ms;
- IntLockCounter writes_completed;
- IntLockCounter bytes_written;
- IntLockCounter write_time_ms;
- IntLockCounter io_time_ms;
- IntLockCounter io_time_weigthed;
+ METRIC_DEFINE_INT_LOCK_COUNTER(reads_completed, MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_LOCK_COUNTER(bytes_read, MetricUnit::BYTES);
+ METRIC_DEFINE_INT_LOCK_COUNTER(read_time_ms, MetricUnit::MILLISECONDS);
+ METRIC_DEFINE_INT_LOCK_COUNTER(writes_completed, MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_LOCK_COUNTER(bytes_written, MetricUnit::BYTES);
+ METRIC_DEFINE_INT_LOCK_COUNTER(write_time_ms, MetricUnit::MILLISECONDS);
+ METRIC_DEFINE_INT_LOCK_COUNTER(io_time_ms, MetricUnit::MILLISECONDS);
+ METRIC_DEFINE_INT_LOCK_COUNTER(io_time_weigthed, MetricUnit::MILLISECONDS);
};
struct NetMetrics {
- IntLockCounter receive_bytes;
- IntLockCounter receive_packets;
- IntLockCounter send_bytes;
- IntLockCounter send_packets;
+ METRIC_DEFINE_INT_LOCK_COUNTER(receive_bytes, MetricUnit::BYTES);
+ METRIC_DEFINE_INT_LOCK_COUNTER(receive_packets, MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_LOCK_COUNTER(send_bytes, MetricUnit::BYTES);
+ METRIC_DEFINE_INT_LOCK_COUNTER(send_packets, MetricUnit::NUMBER);
};
struct FileDescriptorMetrics {
- IntGauge fd_num_limit;
- IntGauge fd_num_used;
+ METRIC_DEFINE_INT_GAUGE(fd_num_limit, MetricUnit::NUMBER);
+ METRIC_DEFINE_INT_GAUGE(fd_num_used, MetricUnit::NUMBER);
};
SystemMetrics::SystemMetrics() {
@@ -110,9 +117,9 @@ void SystemMetrics::update() {
void SystemMetrics::_install_cpu_metrics(MetricRegistry* registry) {
_cpu_total.reset(new CpuMetrics());
- for (int i = 0; i < CpuMetrics::k_num_metrics; ++i) {
+ for (int i = 0; i < CpuMetrics::cpu_num_metrics; ++i) {
registry->register_metric("cpu",
- MetricLabels().add("mode",
CpuMetrics::k_names[i]),
+ MetricLabels().add("mode",
CpuMetrics::cpu_metrics[i]),
&_cpu_total->metrics[i]);
}
}
@@ -146,7 +153,7 @@ void SystemMetrics::_update_cpu_metrics() {
}
char cpu[16];
- int64_t values[CpuMetrics::k_num_metrics];
+ int64_t values[CpuMetrics::cpu_num_metrics];
memset(values, 0, sizeof(values));
sscanf(_line_ptr, "%15s"
" %" PRId64 " %" PRId64 " %" PRId64
@@ -159,7 +166,7 @@ void SystemMetrics::_update_cpu_metrics() {
&values[6], &values[7], &values[8],
&values[9]);
- for (int i = 0; i < CpuMetrics::k_num_metrics; ++i) {
+ for (int i = 0; i < CpuMetrics::cpu_num_metrics; ++i) {
_cpu_total->metrics[i].set_value(values[i]);
}
diff --git a/be/src/util/thrift_server.cpp b/be/src/util/thrift_server.cpp
index c9b69d8..9985732 100644
--- a/be/src/util/thrift_server.cpp
+++ b/be/src/util/thrift_server.cpp
@@ -276,12 +276,12 @@ ThriftServer::ThriftServer(
_session_handler(NULL) {
if (metrics != NULL) {
_metrics_enabled = true;
- _current_connections.reset(new IntGauge());
+ _current_connections.reset(new IntGauge(MetricUnit::NUMBER));
metrics->register_metric("thrift_current_connections",
MetricLabels().add("name", name),
_current_connections.get());
- _connections_total.reset(new IntCounter());
+ _connections_total.reset(new IntCounter(MetricUnit::NUMBER));
metrics->register_metric("thrift_connections_total",
MetricLabels().add("name", name),
_connections_total.get());
diff --git a/be/test/http/metrics_action_test.cpp
b/be/test/http/metrics_action_test.cpp
index 4c63761..6bbab74 100644
--- a/be/test/http/metrics_action_test.cpp
+++ b/be/test/http/metrics_action_test.cpp
@@ -53,10 +53,10 @@ private:
TEST_F(MetricsActionTest, prometheus_output) {
MetricRegistry registry("test");
- IntGauge cpu_idle;
+ IntGauge cpu_idle(MetricUnit::PERCENT);
cpu_idle.set_value(50);
registry.register_metric("cpu_idle", &cpu_idle);
- IntCounter put_requests_total;
+ IntCounter put_requests_total(MetricUnit::NUMBER);
put_requests_total.increment(2345);
registry.register_metric("requests_total",
MetricLabels().add("type", "put").add("path",
"/sports"),
@@ -73,7 +73,7 @@ TEST_F(MetricsActionTest, prometheus_output) {
TEST_F(MetricsActionTest, prometheus_no_prefix) {
MetricRegistry registry("");
- IntGauge cpu_idle;
+ IntGauge cpu_idle(MetricUnit::PERCENT);
cpu_idle.set_value(50);
registry.register_metric("cpu_idle", &cpu_idle);
s_expect_response =
@@ -86,7 +86,7 @@ TEST_F(MetricsActionTest, prometheus_no_prefix) {
TEST_F(MetricsActionTest, prometheus_no_name) {
MetricRegistry registry("test");
- IntGauge cpu_idle;
+ IntGauge cpu_idle(MetricUnit::PERCENT);
cpu_idle.set_value(50);
registry.register_metric("", &cpu_idle);
s_expect_response = "";
diff --git a/be/test/util/doris_metrics_test.cpp
b/be/test/util/doris_metrics_test.cpp
index e308f45..76a6a2e 100644
--- a/be/test/util/doris_metrics_test.cpp
+++ b/be/test/util/doris_metrics_test.cpp
@@ -60,7 +60,7 @@ public:
_ss << "}";
}
}
- _ss << " " << ((SimpleMetric*)metric)->to_string() <<
std::endl;
+ _ss << " " << metric->to_string() << std::endl;
break;
}
default:
@@ -85,81 +85,81 @@ TEST_F(DorisMetricsTest, Normal) {
DorisMetrics::instance()->fragment_requests_total.increment(12);
auto metric = metrics->get_metric("fragment_requests_total");
ASSERT_TRUE(metric != nullptr);
- ASSERT_STREQ("12", ((SimpleMetric*)metric)->to_string().c_str());
+ ASSERT_STREQ("12", metric->to_string().c_str());
}
{
DorisMetrics::instance()->fragment_request_duration_us.increment(101);
auto metric = metrics->get_metric("fragment_request_duration_us");
ASSERT_TRUE(metric != nullptr);
- ASSERT_STREQ("101", ((SimpleMetric*)metric)->to_string().c_str());
+ ASSERT_STREQ("101", metric->to_string().c_str());
}
{
DorisMetrics::instance()->http_requests_total.increment(102);
auto metric = metrics->get_metric("http_requests_total");
ASSERT_TRUE(metric != nullptr);
- ASSERT_STREQ("102", ((SimpleMetric*)metric)->to_string().c_str());
+ ASSERT_STREQ("102", metric->to_string().c_str());
}
{
DorisMetrics::instance()->http_request_duration_us.increment(103);
auto metric = metrics->get_metric("http_request_duration_us");
ASSERT_TRUE(metric != nullptr);
- ASSERT_STREQ("103", ((SimpleMetric*)metric)->to_string().c_str());
+ ASSERT_STREQ("103", metric->to_string().c_str());
}
{
DorisMetrics::instance()->http_request_send_bytes.increment(104);
auto metric = metrics->get_metric("http_request_send_bytes");
ASSERT_TRUE(metric != nullptr);
- ASSERT_STREQ("104", ((SimpleMetric*)metric)->to_string().c_str());
+ ASSERT_STREQ("104", metric->to_string().c_str());
}
{
DorisMetrics::instance()->query_scan_bytes.increment(104);
auto metric = metrics->get_metric("query_scan_bytes");
ASSERT_TRUE(metric != nullptr);
- ASSERT_STREQ("104", ((SimpleMetric*)metric)->to_string().c_str());
+ ASSERT_STREQ("104", metric->to_string().c_str());
}
{
DorisMetrics::instance()->query_scan_rows.increment(105);
auto metric = metrics->get_metric("query_scan_rows");
ASSERT_TRUE(metric != nullptr);
- ASSERT_STREQ("105", ((SimpleMetric*)metric)->to_string().c_str());
+ ASSERT_STREQ("105", metric->to_string().c_str());
}
{
DorisMetrics::instance()->ranges_processed_total.increment(13);
auto metric = metrics->get_metric("ranges_processed_total");
ASSERT_TRUE(metric != nullptr);
- ASSERT_STREQ("13", ((SimpleMetric*)metric)->to_string().c_str());
+ ASSERT_STREQ("13", metric->to_string().c_str());
}
{
DorisMetrics::instance()->push_requests_success_total.increment(106);
auto metric = metrics->get_metric("push_requests_total",
MetricLabels().add("status",
"SUCCESS"));
ASSERT_TRUE(metric != nullptr);
- ASSERT_STREQ("106", ((SimpleMetric*)metric)->to_string().c_str());
+ ASSERT_STREQ("106", metric->to_string().c_str());
}
{
DorisMetrics::instance()->push_requests_fail_total.increment(107);
auto metric = metrics->get_metric("push_requests_total",
MetricLabels().add("status",
"FAIL"));
ASSERT_TRUE(metric != nullptr);
- ASSERT_STREQ("107", ((SimpleMetric*)metric)->to_string().c_str());
+ ASSERT_STREQ("107", metric->to_string().c_str());
}
{
DorisMetrics::instance()->push_request_duration_us.increment(108);
auto metric = metrics->get_metric("push_request_duration_us");
ASSERT_TRUE(metric != nullptr);
- ASSERT_STREQ("108", ((SimpleMetric*)metric)->to_string().c_str());
+ ASSERT_STREQ("108", metric->to_string().c_str());
}
{
DorisMetrics::instance()->push_request_write_bytes.increment(109);
auto metric = metrics->get_metric("push_request_write_bytes");
ASSERT_TRUE(metric != nullptr);
- ASSERT_STREQ("109", ((SimpleMetric*)metric)->to_string().c_str());
+ ASSERT_STREQ("109", metric->to_string().c_str());
}
{
DorisMetrics::instance()->push_request_write_rows.increment(110);
auto metric = metrics->get_metric("push_request_write_rows");
ASSERT_TRUE(metric != nullptr);
- ASSERT_STREQ("110", ((SimpleMetric*)metric)->to_string().c_str());
+ ASSERT_STREQ("110", metric->to_string().c_str());
}
// engine request
{
@@ -168,7 +168,7 @@ TEST_F(DorisMetricsTest, Normal) {
MetricLabels().add("type",
"create_tablet")
.add("status", "total"));
ASSERT_TRUE(metric != nullptr);
- ASSERT_STREQ("15", ((SimpleMetric*)metric)->to_string().c_str());
+ ASSERT_STREQ("15", metric->to_string().c_str());
}
{
DorisMetrics::instance()->drop_tablet_requests_total.increment(16);
@@ -176,7 +176,7 @@ TEST_F(DorisMetricsTest, Normal) {
MetricLabels().add("type",
"drop_tablet")
.add("status", "total"));
ASSERT_TRUE(metric != nullptr);
- ASSERT_STREQ("16", ((SimpleMetric*)metric)->to_string().c_str());
+ ASSERT_STREQ("16", metric->to_string().c_str());
}
{
DorisMetrics::instance()->report_all_tablets_requests_total.increment(17);
@@ -184,7 +184,7 @@ TEST_F(DorisMetricsTest, Normal) {
MetricLabels().add("type",
"report_all_tablets")
.add("status", "total"));
ASSERT_TRUE(metric != nullptr);
- ASSERT_STREQ("17", ((SimpleMetric*)metric)->to_string().c_str());
+ ASSERT_STREQ("17", metric->to_string().c_str());
}
{
DorisMetrics::instance()->report_tablet_requests_total.increment(18);
@@ -192,7 +192,7 @@ TEST_F(DorisMetricsTest, Normal) {
MetricLabels().add("type",
"report_tablet")
.add("status", "total"));
ASSERT_TRUE(metric != nullptr);
- ASSERT_STREQ("18", ((SimpleMetric*)metric)->to_string().c_str());
+ ASSERT_STREQ("18", metric->to_string().c_str());
}
{
DorisMetrics::instance()->schema_change_requests_total.increment(19);
@@ -200,7 +200,7 @@ TEST_F(DorisMetricsTest, Normal) {
MetricLabels().add("type",
"schema_change")
.add("status", "total"));
ASSERT_TRUE(metric != nullptr);
- ASSERT_STREQ("19", ((SimpleMetric*)metric)->to_string().c_str());
+ ASSERT_STREQ("19", metric->to_string().c_str());
}
{
DorisMetrics::instance()->create_rollup_requests_total.increment(20);
@@ -208,7 +208,7 @@ TEST_F(DorisMetricsTest, Normal) {
MetricLabels().add("type",
"create_rollup")
.add("status", "total"));
ASSERT_TRUE(metric != nullptr);
- ASSERT_STREQ("20", ((SimpleMetric*)metric)->to_string().c_str());
+ ASSERT_STREQ("20", metric->to_string().c_str());
}
{
DorisMetrics::instance()->storage_migrate_requests_total.increment(21);
@@ -216,7 +216,7 @@ TEST_F(DorisMetricsTest, Normal) {
MetricLabels().add("type",
"storage_migrate")
.add("status", "total"));
ASSERT_TRUE(metric != nullptr);
- ASSERT_STREQ("21", ((SimpleMetric*)metric)->to_string().c_str());
+ ASSERT_STREQ("21", metric->to_string().c_str());
}
{
DorisMetrics::instance()->delete_requests_total.increment(22);
@@ -224,7 +224,7 @@ TEST_F(DorisMetricsTest, Normal) {
MetricLabels().add("type", "delete")
.add("status", "total"));
ASSERT_TRUE(metric != nullptr);
- ASSERT_STREQ("22", ((SimpleMetric*)metric)->to_string().c_str());
+ ASSERT_STREQ("22", metric->to_string().c_str());
}
// comapction
{
@@ -232,35 +232,35 @@ TEST_F(DorisMetricsTest, Normal) {
auto metric = metrics->get_metric("compaction_deltas_total",
MetricLabels().add("type", "base"));
ASSERT_TRUE(metric != nullptr);
- ASSERT_STREQ("30", ((SimpleMetric*)metric)->to_string().c_str());
+ ASSERT_STREQ("30", metric->to_string().c_str());
}
{
DorisMetrics::instance()->cumulative_compaction_deltas_total.increment(31);
auto metric = metrics->get_metric("compaction_deltas_total",
MetricLabels().add("type",
"cumulative"));
ASSERT_TRUE(metric != nullptr);
- ASSERT_STREQ("31", ((SimpleMetric*)metric)->to_string().c_str());
+ ASSERT_STREQ("31", metric->to_string().c_str());
}
{
DorisMetrics::instance()->base_compaction_bytes_total.increment(32);
auto metric = metrics->get_metric("compaction_bytes_total",
MetricLabels().add("type", "base"));
ASSERT_TRUE(metric != nullptr);
- ASSERT_STREQ("32", ((SimpleMetric*)metric)->to_string().c_str());
+ ASSERT_STREQ("32", metric->to_string().c_str());
}
{
DorisMetrics::instance()->cumulative_compaction_bytes_total.increment(33);
auto metric = metrics->get_metric("compaction_bytes_total",
MetricLabels().add("type",
"cumulative"));
ASSERT_TRUE(metric != nullptr);
- ASSERT_STREQ("33", ((SimpleMetric*)metric)->to_string().c_str());
+ ASSERT_STREQ("33", metric->to_string().c_str());
}
// Gauge
{
DorisMetrics::instance()->memory_pool_bytes_total.increment(40);
auto metric = metrics->get_metric("memory_pool_bytes_total");
ASSERT_TRUE(metric != nullptr);
- ASSERT_STREQ("40", ((SimpleMetric*)metric)->to_string().c_str());
+ ASSERT_STREQ("40", metric->to_string().c_str());
}
}
diff --git a/be/test/util/new_metrics_test.cpp
b/be/test/util/new_metrics_test.cpp
index d35d6cf..649593b 100644
--- a/be/test/util/new_metrics_test.cpp
+++ b/be/test/util/new_metrics_test.cpp
@@ -36,7 +36,7 @@ public:
TEST_F(MetricsTest, Counter) {
{
- IntCounter counter;
+ IntCounter counter(MetricUnit::NUMBER);
ASSERT_EQ(0, counter.value());
counter.increment(100);
ASSERT_EQ(100, counter.value());
@@ -44,7 +44,7 @@ TEST_F(MetricsTest, Counter) {
ASSERT_STREQ("100", counter.to_string().c_str());
}
{
- DoubleCounter counter;
+ DoubleCounter counter(MetricUnit::NUMBER);
ASSERT_EQ(0.0, counter.value());
counter.increment(1.23);
ASSERT_EQ(1.23, counter.value());
@@ -65,7 +65,7 @@ void mt_updater(IntCounter* counter, std::atomic<uint64_t>*
used_time) {
}
TEST_F(MetricsTest, CounterPerf) {
- IntCounter counter;
+ IntCounter counter(MetricUnit::NUMBER);
volatile int64_t sum = 0;
{
@@ -91,7 +91,7 @@ TEST_F(MetricsTest, CounterPerf) {
ASSERT_EQ(100000000, counter.value());
ASSERT_EQ(100000000, sum);
{
- IntCounter mt_counter;
+ IntCounter mt_counter(MetricUnit::NUMBER);
std::vector<std::thread> updaters;
std::atomic<uint64_t> used_time(0);
for (int i = 0; i < 8; ++i) {
@@ -108,7 +108,7 @@ TEST_F(MetricsTest, CounterPerf) {
TEST_F(MetricsTest, Gauge) {
{
- IntGauge gauge;
+ IntGauge gauge(MetricUnit::NUMBER);
ASSERT_EQ(0, gauge.value());
gauge.set_value(100);
ASSERT_EQ(100, gauge.value());
@@ -116,7 +116,7 @@ TEST_F(MetricsTest, Gauge) {
ASSERT_STREQ("100", gauge.to_string().c_str());
}
{
- DoubleGauge gauge;
+ DoubleGauge gauge(MetricUnit::NUMBER);
ASSERT_EQ(0.0, gauge.value());
gauge.set_value(1.23);
ASSERT_EQ(1.23, gauge.value());
@@ -189,7 +189,7 @@ public:
}
_ss << labels.to_string();
}
- _ss << " " << ((SimpleMetric*)metric)->to_string() <<
std::endl;
+ _ss << " " << metric->to_string() << std::endl;
break;
}
default:
@@ -205,9 +205,9 @@ private:
};
TEST_F(MetricsTest, MetricCollector) {
- IntCounter puts;
+ IntCounter puts(MetricUnit::NUMBER);
puts.increment(101);
- IntCounter gets;
+ IntCounter gets(MetricUnit::NUMBER);
gets.increment(201);
MetricCollector collector;
ASSERT_TRUE(collector.add_metic(MetricLabels().add("type", "put"), &puts));
@@ -216,7 +216,7 @@ TEST_F(MetricsTest, MetricCollector) {
{
// Can't add different type to one collector
- IntGauge post;
+ IntGauge post(MetricUnit::NUMBER);
ASSERT_FALSE(collector.add_metic(MetricLabels().add("type", "post"),
&post));
}
@@ -241,13 +241,13 @@ TEST_F(MetricsTest, MetricCollector) {
TEST_F(MetricsTest, MetricRegistry) {
MetricRegistry registry("test");
- IntCounter cpu_idle;
+ IntCounter cpu_idle(MetricUnit::PERCENT);
cpu_idle.increment(12);
ASSERT_TRUE(registry.register_metric("cpu_idle", &cpu_idle));
// registry failed
- IntCounter dummy;
+ IntCounter dummy(MetricUnit::PERCENT);
ASSERT_FALSE(registry.register_metric("cpu_idle", &dummy));
- IntCounter memory_usage;
+ IntCounter memory_usage(MetricUnit::BYTES);
memory_usage.increment(24);
ASSERT_TRUE(registry.register_metric("memory_usage", &memory_usage));
{
@@ -268,13 +268,13 @@ TEST_F(MetricsTest, MetricRegistry) {
TEST_F(MetricsTest, MetricRegistry2) {
MetricRegistry registry("test");
- IntCounter cpu_idle;
+ IntCounter cpu_idle(MetricUnit::PERCENT);
cpu_idle.increment(12);
ASSERT_TRUE(registry.register_metric("cpu_idle", &cpu_idle));
{
// memory_usage will deregister after this block
- IntCounter memory_usage;
+ IntCounter memory_usage(MetricUnit::BYTES);
memory_usage.increment(24);
ASSERT_TRUE(registry.register_metric("memory_usage", &memory_usage));
TestMetricsVisitor visitor;
diff --git a/be/test/util/system_metrics_test.cpp
b/be/test/util/system_metrics_test.cpp
index 70e1d3e..4ec253e 100644
--- a/be/test/util/system_metrics_test.cpp
+++ b/be/test/util/system_metrics_test.cpp
@@ -65,7 +65,7 @@ public:
_ss << "}";
}
}
- _ss << " " << ((SimpleMetric*)metric)->to_string() <<
std::endl;
+ _ss << " " << metric->to_string() << std::endl;
break;
}
default:
@@ -118,104 +118,104 @@ TEST_F(SystemMetricsTest, normal) {
LOG(INFO) << "\n" << visitor.to_string();
// cpu
- SimpleMetric* cpu_user = (SimpleMetric*)registry.get_metric(
+ Metric* cpu_user = registry.get_metric(
"cpu", MetricLabels().add("mode", "user"));
ASSERT_TRUE(cpu_user != nullptr);
// ASSERT_STREQ("57199151", cpu_user->to_string().c_str());
- SimpleMetric* cpu_nice = (SimpleMetric*)registry.get_metric(
+ Metric* cpu_nice = registry.get_metric(
"cpu", MetricLabels().add("mode", "nice"));
ASSERT_TRUE(cpu_nice != nullptr);
ASSERT_STREQ("2616310", cpu_nice->to_string().c_str());
- SimpleMetric* cpu_system = (SimpleMetric*)registry.get_metric(
+ Metric* cpu_system = registry.get_metric(
"cpu", MetricLabels().add("mode", "system"));
ASSERT_TRUE(cpu_system != nullptr);
ASSERT_STREQ("10600935", cpu_system->to_string().c_str());
- SimpleMetric* cpu_idle = (SimpleMetric*)registry.get_metric(
+ Metric* cpu_idle = registry.get_metric(
"cpu", MetricLabels().add("mode", "idle"));
ASSERT_TRUE(cpu_idle != nullptr);
ASSERT_STREQ("1517505423", cpu_idle->to_string().c_str());
- SimpleMetric* cpu_iowait = (SimpleMetric*)registry.get_metric(
+ Metric* cpu_iowait = registry.get_metric(
"cpu", MetricLabels().add("mode", "iowait"));
ASSERT_TRUE(cpu_iowait != nullptr);
ASSERT_STREQ("2137148", cpu_iowait->to_string().c_str());
- SimpleMetric* cpu_irq = (SimpleMetric*)registry.get_metric(
+ Metric* cpu_irq = registry.get_metric(
"cpu", MetricLabels().add("mode", "irq"));
ASSERT_TRUE(cpu_irq != nullptr);
ASSERT_STREQ("0", cpu_irq->to_string().c_str());
- SimpleMetric* cpu_softirq = (SimpleMetric*)registry.get_metric(
+ Metric* cpu_softirq = registry.get_metric(
"cpu", MetricLabels().add("mode", "soft_irq"));
ASSERT_TRUE(cpu_softirq != nullptr);
ASSERT_STREQ("108277", cpu_softirq->to_string().c_str());
- SimpleMetric* cpu_steal = (SimpleMetric*)registry.get_metric(
+ Metric* cpu_steal = registry.get_metric(
"cpu", MetricLabels().add("mode", "steal"));
ASSERT_TRUE(cpu_steal != nullptr);
ASSERT_STREQ("0", cpu_steal->to_string().c_str());
- SimpleMetric* cpu_guest = (SimpleMetric*)registry.get_metric(
+ Metric* cpu_guest = registry.get_metric(
"cpu", MetricLabels().add("mode", "guest"));
ASSERT_TRUE(cpu_guest != nullptr);
ASSERT_STREQ("0", cpu_guest->to_string().c_str());
// memroy
- SimpleMetric* memory_allocated_bytes =
(SimpleMetric*)registry.get_metric(
+ Metric* memory_allocated_bytes = registry.get_metric(
"memory_allocated_bytes");
ASSERT_TRUE(memory_allocated_bytes != nullptr);
// network
- SimpleMetric* receive_bytes = (SimpleMetric*)registry.get_metric(
+ Metric* receive_bytes = registry.get_metric(
"network_receive_bytes", MetricLabels().add("device", "xgbe0"));
ASSERT_TRUE(receive_bytes != nullptr);
ASSERT_STREQ("52567436039", receive_bytes->to_string().c_str());
- SimpleMetric* receive_packets = (SimpleMetric*)registry.get_metric(
+ Metric* receive_packets = registry.get_metric(
"network_receive_packets", MetricLabels().add("device", "xgbe0"));
ASSERT_TRUE(receive_packets != nullptr);
ASSERT_STREQ("65066152", receive_packets->to_string().c_str());
- SimpleMetric* send_bytes = (SimpleMetric*)registry.get_metric(
+ Metric* send_bytes = registry.get_metric(
"network_send_bytes", MetricLabels().add("device", "xgbe0"));
ASSERT_TRUE(send_bytes != nullptr);
ASSERT_STREQ("45480856156", send_bytes->to_string().c_str());
- SimpleMetric* send_packets = (SimpleMetric*)registry.get_metric(
+ Metric* send_packets = registry.get_metric(
"network_send_packets", MetricLabels().add("device", "xgbe0"));
ASSERT_TRUE(send_packets != nullptr);
ASSERT_STREQ("88277614", send_packets->to_string().c_str());
// disk
- SimpleMetric* bytes_read = (SimpleMetric*)registry.get_metric(
+ Metric* bytes_read = registry.get_metric(
"disk_bytes_read", MetricLabels().add("device", "sda"));
ASSERT_TRUE(bytes_read != nullptr);
ASSERT_STREQ("20142745600", bytes_read->to_string().c_str());
- SimpleMetric* reads_completed = (SimpleMetric*)registry.get_metric(
+ Metric* reads_completed = registry.get_metric(
"disk_reads_completed", MetricLabels().add("device", "sda"));
ASSERT_TRUE(reads_completed != nullptr);
ASSERT_STREQ("759548", reads_completed->to_string().c_str());
- SimpleMetric* read_time_ms = (SimpleMetric*)registry.get_metric(
+ Metric* read_time_ms = registry.get_metric(
"disk_read_time_ms", MetricLabels().add("device", "sda"));
ASSERT_TRUE(read_time_ms != nullptr);
ASSERT_STREQ("4308146", read_time_ms->to_string().c_str());
- SimpleMetric* bytes_written = (SimpleMetric*)registry.get_metric(
+ Metric* bytes_written = registry.get_metric(
"disk_bytes_written", MetricLabels().add("device", "sda"));
ASSERT_TRUE(bytes_written != nullptr);
ASSERT_STREQ("1624753500160", bytes_written->to_string().c_str());
- SimpleMetric* writes_completed = (SimpleMetric*)registry.get_metric(
+ Metric* writes_completed = registry.get_metric(
"disk_writes_completed", MetricLabels().add("device", "sda"));
ASSERT_TRUE(writes_completed != nullptr);
ASSERT_STREQ("18282936", writes_completed->to_string().c_str());
- SimpleMetric* write_time_ms = (SimpleMetric*)registry.get_metric(
+ Metric* write_time_ms = registry.get_metric(
"disk_write_time_ms", MetricLabels().add("device", "sda"));
ASSERT_TRUE(write_time_ms != nullptr);
ASSERT_STREQ("1907755230", write_time_ms->to_string().c_str());
- SimpleMetric* io_time_ms = (SimpleMetric*)registry.get_metric(
+ Metric* io_time_ms = registry.get_metric(
"disk_io_time_ms", MetricLabels().add("device", "sda"));
ASSERT_TRUE(io_time_ms != nullptr);
ASSERT_STREQ("19003350", io_time_ms->to_string().c_str());
- SimpleMetric* io_time_weigthed = (SimpleMetric*)registry.get_metric(
+ Metric* io_time_weigthed = registry.get_metric(
"disk_io_time_weigthed", MetricLabels().add("device", "sda"));
ASSERT_TRUE(write_time_ms != nullptr);
ASSERT_STREQ("1912122964", io_time_weigthed->to_string().c_str());
// fd
- SimpleMetric* fd_metric = (SimpleMetric*)registry.get_metric(
+ Metric* fd_metric = registry.get_metric(
"fd_num_limit");
ASSERT_TRUE(fd_metric != nullptr);
ASSERT_STREQ("13052138", fd_metric->to_string().c_str());
- fd_metric = (SimpleMetric*)registry.get_metric(
+ fd_metric = registry.get_metric(
"fd_num_used");
ASSERT_TRUE(fd_metric != nullptr);
ASSERT_STREQ("19520", fd_metric->to_string().c_str());
@@ -263,21 +263,21 @@ TEST_F(SystemMetricsTest, no_proc_file) {
LOG(INFO) << "\n" << visitor.to_string();
// cpu
- SimpleMetric* cpu_user = (SimpleMetric*)registry.get_metric(
+ Metric* cpu_user = registry.get_metric(
"cpu", MetricLabels().add("mode", "user"));
ASSERT_TRUE(cpu_user != nullptr);
ASSERT_STREQ("0", cpu_user->to_string().c_str());
// memroy
- SimpleMetric* memory_allocated_bytes =
(SimpleMetric*)registry.get_metric(
+ Metric* memory_allocated_bytes = registry.get_metric(
"memory_allocated_bytes");
ASSERT_TRUE(memory_allocated_bytes != nullptr);
// network
- SimpleMetric* receive_bytes = (SimpleMetric*)registry.get_metric(
+ Metric* receive_bytes = registry.get_metric(
"network_receive_bytes", MetricLabels().add("device", "xgbe0"));
ASSERT_TRUE(receive_bytes != nullptr);
ASSERT_STREQ("0", receive_bytes->to_string().c_str());
// disk
- SimpleMetric* bytes_read = (SimpleMetric*)registry.get_metric(
+ Metric* bytes_read = registry.get_metric(
"disk_bytes_read", MetricLabels().add("device", "sda"));
ASSERT_TRUE(bytes_read != nullptr);
ASSERT_STREQ("0", bytes_read->to_string().c_str());
diff --git a/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java
b/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java
index f7c67c7..814dc49 100644
--- a/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java
@@ -34,6 +34,7 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
+import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.planner.DataPartition;
import org.apache.doris.planner.DataSink;
@@ -294,6 +295,7 @@ public class InsertStmt extends DdlStmt {
if (targetTable instanceof OlapTable) {
LoadJobSourceType sourceType =
LoadJobSourceType.INSERT_STREAMING;
+ MetricRepo.COUNTER_LOAD_ADD.increase(1L);
transactionId =
Catalog.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(),
Lists.newArrayList(targetTable.getId()), label,
new TxnCoordinator(TxnSourceType.FE,
FrontendOptions.getLocalHostAddress()),
diff --git a/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java
b/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java
index 20d2fc2..ef9b088 100644
--- a/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java
+++ b/fe/src/main/java/org/apache/doris/common/ThreadPoolManager.java
@@ -20,6 +20,7 @@ package org.apache.doris.common;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.doris.metric.GaugeMetric;
+import org.apache.doris.metric.Metric.MetricUnit;
import org.apache.doris.metric.MetricLabel;
import org.apache.doris.metric.MetricRepo;
import org.apache.logging.log4j.LogManager;
@@ -68,7 +69,7 @@ public class ThreadPoolManager {
public static void registerThreadPoolMetric(String poolName,
ThreadPoolExecutor threadPool) {
for (String poolMetricType : poolMerticTypes) {
- GaugeMetric<Integer> gauge = new
GaugeMetric<Integer>("thread_pool", "thread_pool statistics") {
+ GaugeMetric<Integer> gauge = new
GaugeMetric<Integer>("thread_pool", MetricUnit.NUMBER, "thread_pool
statistics") {
@Override
public Integer getValue() {
String metricType = this.getLabels().get(1).getValue();
diff --git
a/fe/src/main/java/org/apache/doris/common/proc/IncompleteTabletsProcNode.java
b/fe/src/main/java/org/apache/doris/common/proc/IncompleteTabletsProcNode.java
index f0f89a3..b278c47 100644
---
a/fe/src/main/java/org/apache/doris/common/proc/IncompleteTabletsProcNode.java
+++
b/fe/src/main/java/org/apache/doris/common/proc/IncompleteTabletsProcNode.java
@@ -29,17 +29,20 @@ import java.util.List;
public class IncompleteTabletsProcNode implements ProcNodeInterface {
public static final ImmutableList<String> TITLE_NAMES = new
ImmutableList.Builder<String>()
- .add("UnhealthyTablets").add("InconsistentTablets")
+
.add("UnhealthyTablets").add("InconsistentTablets").add("CloningTablets")
.build();
-
private static final Joiner JOINER = Joiner.on(",");
Collection<Long> unhealthyTabletIds;
Collection<Long> inconsistentTabletIds;
+ Collection<Long> cloningTabletIds;
- public IncompleteTabletsProcNode(Collection<Long> unhealthyTabletIds,
Collection<Long> inconsistentTabletIds) {
+ public IncompleteTabletsProcNode(Collection<Long> unhealthyTabletIds,
+ Collection<Long> inconsistentTabletIds,
+ Collection<Long> cloningTabletIds) {
this.unhealthyTabletIds = unhealthyTabletIds;
this.inconsistentTabletIds = inconsistentTabletIds;
+ this.cloningTabletIds = cloningTabletIds;
}
@Override
@@ -52,8 +55,10 @@ public class IncompleteTabletsProcNode implements
ProcNodeInterface {
String incompleteTablets =
JOINER.join(Arrays.asList(unhealthyTabletIds));
String inconsistentTablets =
JOINER.join(Arrays.asList(inconsistentTabletIds));
+ String cloningTablets = JOINER.join(Arrays.asList(cloningTabletIds));
row.add(incompleteTablets);
row.add(inconsistentTablets);
+ row.add(cloningTablets);
result.addRow(row);
diff --git
a/fe/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java
b/fe/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java
index 72ca897..e75a3fe 100644
--- a/fe/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java
+++ b/fe/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java
@@ -32,6 +32,8 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.ListComparator;
import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.task.AgentTaskQueue;
+import org.apache.doris.thrift.TTaskType;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
@@ -42,12 +44,16 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
public class StatisticProcDir implements ProcDirInterface {
public static final ImmutableList<String> TITLE_NAMES = new
ImmutableList.Builder<String>()
.add("DbId").add("DbName").add("TableNum").add("PartitionNum")
.add("IndexNum").add("TabletNum").add("ReplicaNum").add("UnhealthyTabletNum")
- .add("InconsistentTabletNum")
+ .add("InconsistentTabletNum").add("CloningTabletNum")
.build();
+ private static final Logger LOG =
LogManager.getLogger(StatisticProcDir.class);
private Catalog catalog;
@@ -55,11 +61,14 @@ public class StatisticProcDir implements ProcDirInterface {
Multimap<Long, Long> unhealthyTabletIds;
// db id -> set(tablet id)
Multimap<Long, Long> inconsistentTabletIds;
+ // db id -> set(tablet id)
+ Multimap<Long, Long> cloningTabletIds;
public StatisticProcDir(Catalog catalog) {
this.catalog = catalog;
unhealthyTabletIds = HashMultimap.create();
inconsistentTabletIds = HashMultimap.create();
+ cloningTabletIds = HashMultimap.create();
}
@Override
@@ -86,6 +95,7 @@ public class StatisticProcDir implements ProcDirInterface {
unhealthyTabletIds.clear();
inconsistentTabletIds.clear();
+ cloningTabletIds = AgentTaskQueue.getTabletIdsByType(TTaskType.CLONE);
List<List<Comparable>> lines = new ArrayList<List<Comparable>>();
for (Long dbId : dbIds) {
if (dbId == 0) {
@@ -153,6 +163,7 @@ public class StatisticProcDir implements ProcDirInterface {
oneLine.add(dbReplicaNum);
oneLine.add(unhealthyTabletIds.get(dbId).size());
oneLine.add(inconsistentTabletIds.get(dbId).size());
+ oneLine.add(cloningTabletIds.get(dbId).size());
lines.add(oneLine);
@@ -181,6 +192,7 @@ public class StatisticProcDir implements ProcDirInterface {
finalLine.add(totalReplicaNum);
finalLine.add(unhealthyTabletIds.size());
finalLine.add(inconsistentTabletIds.size());
+ finalLine.add(cloningTabletIds.size());
lines.add(finalLine);
// add result
@@ -209,6 +221,8 @@ public class StatisticProcDir implements ProcDirInterface {
throw new AnalysisException("Invalid db id format: " + dbIdStr);
}
- return new IncompleteTabletsProcNode(unhealthyTabletIds.get(dbId),
inconsistentTabletIds.get(dbId));
+ return new IncompleteTabletsProcNode(unhealthyTabletIds.get(dbId),
+ inconsistentTabletIds.get(dbId),
+ cloningTabletIds.get(dbId));
}
}
diff --git a/fe/src/main/java/org/apache/doris/http/rest/MetricsAction.java
b/fe/src/main/java/org/apache/doris/http/rest/MetricsAction.java
index 10e1874..24a6224 100644
--- a/fe/src/main/java/org/apache/doris/http/rest/MetricsAction.java
+++ b/fe/src/main/java/org/apache/doris/http/rest/MetricsAction.java
@@ -23,6 +23,7 @@ import org.apache.doris.http.BaseResponse;
import org.apache.doris.http.IllegalArgException;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.metric.MetricVisitor;
+import org.apache.doris.metric.JsonMetricVisitor;
import org.apache.doris.metric.PrometheusMetricVisitor;
import org.apache.doris.metric.SimpleCoreMetricVisitor;
@@ -50,6 +51,8 @@ public class MetricsAction extends RestBaseAction {
MetricVisitor visitor = null;
if (!Strings.isNullOrEmpty(type) && type.equalsIgnoreCase("core")) {
visitor = new SimpleCoreMetricVisitor("doris_fe");
+ } else if (!Strings.isNullOrEmpty(type) &&
type.equalsIgnoreCase("agent")) {
+ visitor = new JsonMetricVisitor("doris_fe");
} else {
visitor = new PrometheusMetricVisitor("doris_fe");
}
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 7222dd8..c4d7d2c 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -45,6 +45,7 @@ import org.apache.doris.load.BrokerFileGroupAggInfo;
import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.FailMsg;
+import org.apache.doris.metric.MetricRepo;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SessionVariable;
@@ -209,6 +210,7 @@ public class BrokerLoadJob extends LoadJob {
@Override
public void beginTxn()
throws LabelAlreadyUsedException, BeginTransactionException,
AnalysisException, DuplicatedRequestException {
+ MetricRepo.COUNTER_LOAD_ADD.increase(1L);
transactionId = Catalog.getCurrentGlobalTransactionMgr()
.beginTransaction(dbId,
Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null,
new TxnCoordinator(TxnSourceType.FE,
FrontendOptions.getLocalHostAddress()),
@@ -468,6 +470,7 @@ public class BrokerLoadJob extends LoadJob {
.add("txn_id", transactionId)
.add("msg", "Load job try to commit txn")
.build());
+ MetricRepo.COUNTER_LOAD_FINISHED.increase(1L);
Catalog.getCurrentGlobalTransactionMgr().commitTransaction(
dbId, transactionId, commitInfos,
new LoadJobFinalOperation(id, loadingStatus, progress,
loadStartTimestamp,
diff --git
a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index 218bfd1..28469fc 100644
---
a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++
b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -25,6 +25,7 @@ import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.metric.MetricRepo;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TRoutineLoadTask;
import org.apache.doris.transaction.BeginTransactionException;
@@ -165,6 +166,7 @@ public abstract class RoutineLoadTaskInfo {
// begin a txn for task
RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId);
try {
+ MetricRepo.COUNTER_LOAD_ADD.increase(1L);
txnId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(
routineLoadJob.getDbId(),
Lists.newArrayList(routineLoadJob.getTableId()), DebugUtil.printId(id), null,
new TxnCoordinator(TxnSourceType.FE,
FrontendOptions.getLocalHostAddress()),
diff --git a/fe/src/main/java/org/apache/doris/master/ReportHandler.java
b/fe/src/main/java/org/apache/doris/master/ReportHandler.java
index 8dbe044..83ed5b7 100644
--- a/fe/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -38,6 +38,7 @@ import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.Daemon;
import org.apache.doris.metric.GaugeMetric;
+import org.apache.doris.metric.Metric.MetricUnit;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.persist.BackendTabletsInfo;
import org.apache.doris.persist.ReplicaPersistInfo;
@@ -96,7 +97,7 @@ public class ReportHandler extends Daemon {
public ReportHandler() {
GaugeMetric<Long> gaugeQueueSize = new GaugeMetric<Long>(
- "report_queue_size", "report queue size") {
+ "report_queue_size", MetricUnit.NUMBER, "report queue size") {
@Override
public Long getValue() {
return (long) reportQueue.size();
diff --git a/fe/src/main/java/org/apache/doris/metric/CounterMetric.java
b/fe/src/main/java/org/apache/doris/metric/CounterMetric.java
index 9cdd452..ecc96a4 100644
--- a/fe/src/main/java/org/apache/doris/metric/CounterMetric.java
+++ b/fe/src/main/java/org/apache/doris/metric/CounterMetric.java
@@ -22,8 +22,8 @@ package org.apache.doris.metric;
*/
public abstract class CounterMetric<T> extends Metric<T> {
- public CounterMetric(String name, String description) {
- super(name, MetricType.COUNTER, description);
+ public CounterMetric(String name, MetricUnit unit, String description) {
+ super(name, MetricType.COUNTER, unit, description);
}
abstract public void increase(T delta);
diff --git a/fe/src/main/java/org/apache/doris/metric/DoubleCounterMetric.java
b/fe/src/main/java/org/apache/doris/metric/DoubleCounterMetric.java
index b4b9240..c3cf3b8 100644
--- a/fe/src/main/java/org/apache/doris/metric/DoubleCounterMetric.java
+++ b/fe/src/main/java/org/apache/doris/metric/DoubleCounterMetric.java
@@ -21,8 +21,8 @@ import com.google.common.util.concurrent.AtomicDouble;
public class DoubleCounterMetric extends CounterMetric<Double> {
- public DoubleCounterMetric(String name, String description) {
- super(name, description);
+ public DoubleCounterMetric(String name, MetricUnit unit, String
description) {
+ super(name, unit, description);
}
private AtomicDouble value = new AtomicDouble(0.0);
diff --git a/fe/src/main/java/org/apache/doris/metric/GaugeMetric.java
b/fe/src/main/java/org/apache/doris/metric/GaugeMetric.java
index 581da72..2e8d819 100644
--- a/fe/src/main/java/org/apache/doris/metric/GaugeMetric.java
+++ b/fe/src/main/java/org/apache/doris/metric/GaugeMetric.java
@@ -22,7 +22,7 @@ package org.apache.doris.metric;
*/
public abstract class GaugeMetric<T> extends Metric<T> {
- public GaugeMetric(String name, String description) {
- super(name, MetricType.GAUGE, description);
+ public GaugeMetric(String name, MetricUnit unit, String description) {
+ super(name, MetricType.GAUGE, unit, description);
}
}
diff --git a/fe/src/main/java/org/apache/doris/metric/GaugeMetricImpl.java
b/fe/src/main/java/org/apache/doris/metric/GaugeMetricImpl.java
index b4cbe4c..a66bc4f 100644
--- a/fe/src/main/java/org/apache/doris/metric/GaugeMetricImpl.java
+++ b/fe/src/main/java/org/apache/doris/metric/GaugeMetricImpl.java
@@ -19,8 +19,8 @@ package org.apache.doris.metric;
public class GaugeMetricImpl<T> extends GaugeMetric<T> {
- public GaugeMetricImpl(String name, String description) {
- super(name, description);
+ public GaugeMetricImpl(String name, MetricUnit unit, String description) {
+ super(name, unit, description);
}
private T value;
diff --git a/fe/src/main/java/org/apache/doris/metric/JsonMetricVisitor.java
b/fe/src/main/java/org/apache/doris/metric/JsonMetricVisitor.java
new file mode 100644
index 0000000..2463a7a
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/metric/JsonMetricVisitor.java
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.metric;
+
+import org.apache.doris.monitor.jvm.JvmStats;
+import com.codahale.metrics.Histogram;
+import java.util.List;
+
+public class JsonMetricVisitor extends MetricVisitor {
+ private int ordinal = 0;
+ private int metricNumber = 0;
+
+ public JsonMetricVisitor(String prefix) {
+ super(prefix);
+ }
+
+ @Override
+ public void setMetricNumber(int metricNumber) {
+ this.metricNumber = metricNumber;
+ }
+
+ @Override
+ public void visitJvm(StringBuilder sb, JvmStats jvmStats) {
+ return;
+ }
+
+ @Override
+ public void visit(StringBuilder sb, @SuppressWarnings("rawtypes") Metric
metric) {
+ if (ordinal++ == 0) {
+ sb.append("[\n");
+ }
+ sb.append("{\n\t\"tags\":\n\t{\n");
+ sb.append("\t\t\"metric\":\"").append(metric.getName()).append("\"");
+
+ // name
+ @SuppressWarnings("unchecked")
+ List<MetricLabel> labels = metric.getLabels();
+ if (!labels.isEmpty()) {
+ sb.append(",\n");
+ int i = 0;
+ for (MetricLabel label : labels) {
+ if (i++ > 0) {
+ sb.append(",\n");
+ }
+
sb.append("\t\t\"").append(label.getKey()).append("\":\"").append(label.getValue()).append("\"");
+ }
+ }
+ sb.append("\n\t},\n");
+
sb.append("\t\"unit\":\"").append(metric.getUnit().name().toLowerCase()).append(
"\",\n");
+
+ // value
+
sb.append("\t\"value\":").append(metric.getValue().toString()).append("\n}");
+ if (ordinal < metricNumber) {
+ sb.append(",\n");
+ } else {
+ sb.append("\n]");
+ }
+ return;
+ }
+
+ @Override
+ public void visitHistogram(StringBuilder sb, String name, Histogram
histogram) {
+ return;
+ }
+
+ @Override
+ public void getNodeInfo(StringBuilder sb) {
+ return;
+ }
+}
+
diff --git a/fe/src/main/java/org/apache/doris/metric/LongCounterMetric.java
b/fe/src/main/java/org/apache/doris/metric/LongCounterMetric.java
index cf4c652..c56a616 100644
--- a/fe/src/main/java/org/apache/doris/metric/LongCounterMetric.java
+++ b/fe/src/main/java/org/apache/doris/metric/LongCounterMetric.java
@@ -21,8 +21,8 @@ import java.util.concurrent.atomic.AtomicLong;
public class LongCounterMetric extends CounterMetric<Long> {
- public LongCounterMetric(String name, String description) {
- super(name, description);
+ public LongCounterMetric(String name, MetricUnit unit, String description)
{
+ super(name, unit, description);
}
private AtomicLong value = new AtomicLong(0L);
diff --git a/fe/src/main/java/org/apache/doris/metric/Metric.java
b/fe/src/main/java/org/apache/doris/metric/Metric.java
index c4e6302..e5e029c 100644
--- a/fe/src/main/java/org/apache/doris/metric/Metric.java
+++ b/fe/src/main/java/org/apache/doris/metric/Metric.java
@@ -26,14 +26,28 @@ public abstract class Metric<T> {
GAUGE, COUNTER
}
+ public enum MetricUnit {
+ NANOSECONDS,
+ MICROSECONDS,
+ MILLISECONDS,
+ SECONDS,
+ BYTES,
+ ROWS,
+ NUMBER,
+ PERCENT,
+ NOUNIT
+ };
+
protected String name;
protected MetricType type;
+ protected MetricUnit unit;
protected List<MetricLabel> labels = Lists.newArrayList();
protected String description;
- public Metric(String name, MetricType type, String description) {
+ public Metric(String name, MetricType type, MetricUnit unit, String
description) {
this.name = name;
this.type = type;
+ this.unit = unit;
this.description = description;
}
@@ -45,6 +59,10 @@ public abstract class Metric<T> {
return type;
}
+ public MetricUnit getUnit() {
+ return unit;
+ }
+
public String getDescription() {
return description;
}
diff --git a/fe/src/main/java/org/apache/doris/metric/MetricRepo.java
b/fe/src/main/java/org/apache/doris/metric/MetricRepo.java
index 8985f95..bbd3a7d 100644
--- a/fe/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -25,6 +25,7 @@ import org.apache.doris.common.Config;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.loadv2.JobState;
import org.apache.doris.load.loadv2.LoadManager;
+import org.apache.doris.metric.Metric.MetricUnit;
import org.apache.doris.monitor.jvm.JvmService;
import org.apache.doris.monitor.jvm.JvmStats;
import org.apache.doris.persist.EditLog;
@@ -96,7 +97,7 @@ public final class MetricRepo {
for (EtlJobType jobType : EtlJobType.values()) {
for (JobState state : JobState.values()) {
GaugeMetric<Long> gauge = (GaugeMetric<Long>) new
GaugeMetric<Long>("job",
- "job statistics") {
+ MetricUnit.NUMBER, "job statistics") {
@Override
public Long getValue() {
if (!Catalog.getInstance().isMaster()) {
@@ -120,7 +121,7 @@ public final class MetricRepo {
}
GaugeMetric<Long> gauge = (GaugeMetric<Long>) new
GaugeMetric<Long>("job",
- "job statistics") {
+ MetricUnit.NUMBER, "job statistics") {
@Override
public Long getValue() {
if (!Catalog.getInstance().isMaster()) {
@@ -144,7 +145,7 @@ public final class MetricRepo {
// connections
GaugeMetric<Integer> conections = (GaugeMetric<Integer>) new
GaugeMetric<Integer>(
- "connection_total", "total connections") {
+ "connection_total", MetricUnit.NUMBER, "total connections") {
@Override
public Integer getValue() {
return
ExecuteEnv.getInstance().getScheduler().getConnectionNum();
@@ -154,7 +155,7 @@ public final class MetricRepo {
// journal id
GaugeMetric<Long> maxJournalId = (GaugeMetric<Long>) new
GaugeMetric<Long>(
- "max_journal_id", "max journal id of this frontends") {
+ "max_journal_id", MetricUnit.NUMBER, "max journal id of this
frontends") {
@Override
public Long getValue() {
EditLog editLog = Catalog.getInstance().getEditLog();
@@ -168,7 +169,7 @@ public final class MetricRepo {
// scheduled tablet num
GaugeMetric<Long> scheduledTabletNum = (GaugeMetric<Long>) new
GaugeMetric<Long>(
- "scheduled_tablet_num", "number of tablets being scheduled") {
+ "scheduled_tablet_num", MetricUnit.NUMBER, "number of tablets
being scheduled") {
@Override
public Long getValue() {
if (!Catalog.getInstance().isMaster()) {
@@ -181,58 +182,58 @@ public final class MetricRepo {
// qps, rps and error rate
// these metrics should be set an init value, in case that metric
calculator is not running
- GAUGE_QUERY_PER_SECOND = new GaugeMetricImpl<>("qps", "query per
second");
+ GAUGE_QUERY_PER_SECOND = new GaugeMetricImpl<>("qps",
MetricUnit.NUMBER, "query per second");
GAUGE_QUERY_PER_SECOND.setValue(0.0);
PALO_METRIC_REGISTER.addPaloMetrics(GAUGE_QUERY_PER_SECOND);
- GAUGE_REQUEST_PER_SECOND = new GaugeMetricImpl<>("rps", "request per
second");
+ GAUGE_REQUEST_PER_SECOND = new GaugeMetricImpl<>("rps",
MetricUnit.NUMBER, "request per second");
GAUGE_REQUEST_PER_SECOND.setValue(0.0);
PALO_METRIC_REGISTER.addPaloMetrics(GAUGE_REQUEST_PER_SECOND);
- GAUGE_QUERY_ERR_RATE = new GaugeMetricImpl<>("query_err_rate", "query
error rate");
+ GAUGE_QUERY_ERR_RATE = new GaugeMetricImpl<>("query_err_rate",
MetricUnit.NUMBER, "query error rate");
PALO_METRIC_REGISTER.addPaloMetrics(GAUGE_QUERY_ERR_RATE);
GAUGE_QUERY_ERR_RATE.setValue(0.0);
GAUGE_MAX_TABLET_COMPACTION_SCORE = new
GaugeMetricImpl<>("max_tablet_compaction_score",
- "max tablet compaction score of all backends");
+ MetricUnit.NUMBER, "max tablet compaction score of all
backends");
PALO_METRIC_REGISTER.addPaloMetrics(GAUGE_MAX_TABLET_COMPACTION_SCORE);
GAUGE_MAX_TABLET_COMPACTION_SCORE.setValue(0L);
// 2. counter
- COUNTER_REQUEST_ALL = new LongCounterMetric("request_total", "total
request");
+ COUNTER_REQUEST_ALL = new LongCounterMetric("request_total",
MetricUnit.NUMBER, "total request");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_REQUEST_ALL);
- COUNTER_QUERY_ALL = new LongCounterMetric("query_total", "total
query");
+ COUNTER_QUERY_ALL = new LongCounterMetric("query_total",
MetricUnit.NUMBER, "total query");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_QUERY_ALL);
- COUNTER_QUERY_ERR = new LongCounterMetric("query_err", "total error
query");
+ COUNTER_QUERY_ERR = new LongCounterMetric("query_err",
MetricUnit.NUMBER, "total error query");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_QUERY_ERR);
- COUNTER_LOAD_ADD = new LongCounterMetric("load_add", "total load
submit");
+ COUNTER_LOAD_ADD = new LongCounterMetric("load_add",
MetricUnit.NUMBER, "total load submit");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_LOAD_ADD);
- COUNTER_LOAD_FINISHED = new LongCounterMetric("load_finished", "total
load finished");
+ COUNTER_LOAD_FINISHED = new LongCounterMetric("load_finished",
MetricUnit.NUMBER, "total load finished");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_LOAD_FINISHED);
- COUNTER_EDIT_LOG_WRITE = new LongCounterMetric("edit_log_write",
"counter of edit log write into bdbje");
+ COUNTER_EDIT_LOG_WRITE = new LongCounterMetric("edit_log_write",
MetricUnit.NUMBER, "counter of edit log write into bdbje");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_EDIT_LOG_WRITE);
- COUNTER_EDIT_LOG_READ = new LongCounterMetric("edit_log_read",
"counter of edit log read from bdbje");
+ COUNTER_EDIT_LOG_READ = new LongCounterMetric("edit_log_read",
MetricUnit.NUMBER, "counter of edit log read from bdbje");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_EDIT_LOG_READ);
- COUNTER_EDIT_LOG_SIZE_BYTES = new
LongCounterMetric("edit_log_size_bytes", "size of edit log");
+ COUNTER_EDIT_LOG_SIZE_BYTES = new
LongCounterMetric("edit_log_size_bytes", MetricUnit.BYTES, "size of edit log");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_EDIT_LOG_SIZE_BYTES);
- COUNTER_IMAGE_WRITE = new LongCounterMetric("image_write", "counter of
image generated");
+ COUNTER_IMAGE_WRITE = new LongCounterMetric("image_write",
MetricUnit.NUMBER, "counter of image generated");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_IMAGE_WRITE);
- COUNTER_IMAGE_PUSH = new LongCounterMetric("image_push",
+ COUNTER_IMAGE_PUSH = new LongCounterMetric("image_push",
MetricUnit.NUMBER,
"counter of image succeeded in pushing to other frontends");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_IMAGE_PUSH);
- COUNTER_TXN_REJECT = new LongCounterMetric("txn_reject", "counter of
rejected transactions");
+ COUNTER_TXN_REJECT = new LongCounterMetric("txn_reject",
MetricUnit.NUMBER, "counter of rejected transactions");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_TXN_REJECT);
- COUNTER_TXN_BEGIN = new LongCounterMetric("txn_begin", "counter of
begining transactions");
+ COUNTER_TXN_BEGIN = new LongCounterMetric("txn_begin",
MetricUnit.NUMBER, "counter of begining transactions");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_TXN_BEGIN);
- COUNTER_TXN_SUCCESS = new LongCounterMetric("txn_success", "counter of
success transactions");
+ COUNTER_TXN_SUCCESS = new LongCounterMetric("txn_success",
MetricUnit.NUMBER, "counter of success transactions");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_TXN_SUCCESS);
- COUNTER_TXN_FAILED = new LongCounterMetric("txn_failed", "counter of
failed transactions");
+ COUNTER_TXN_FAILED = new LongCounterMetric("txn_failed",
MetricUnit.NUMBER, "counter of failed transactions");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_TXN_FAILED);
- COUNTER_ROUTINE_LOAD_ROWS = new LongCounterMetric("routine_load_rows",
"total rows of routine load");
+ COUNTER_ROUTINE_LOAD_ROWS = new LongCounterMetric("routine_load_rows",
MetricUnit.ROWS, "total rows of routine load");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_ROUTINE_LOAD_ROWS);
- COUNTER_ROUTINE_LOAD_RECEIVED_BYTES = new
LongCounterMetric("routine_load_receive_bytes",
+ COUNTER_ROUTINE_LOAD_RECEIVED_BYTES = new
LongCounterMetric("routine_load_receive_bytes", MetricUnit.BYTES,
"total received bytes of routine load");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_ROUTINE_LOAD_RECEIVED_BYTES);
- COUNTER_ROUTINE_LOAD_ERROR_ROWS = new
LongCounterMetric("routine_load_error_rows",
+ COUNTER_ROUTINE_LOAD_ERROR_ROWS = new
LongCounterMetric("routine_load_error_rows", MetricUnit.ROWS,
"total error rows of routine load");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_ROUTINE_LOAD_ERROR_ROWS);
@@ -266,7 +267,7 @@ public final class MetricRepo {
// tablet number of each backends
GaugeMetric<Long> tabletNum = (GaugeMetric<Long>) new
GaugeMetric<Long>(TABLET_NUM,
- "tablet number") {
+ MetricUnit.NUMBER, "tablet number") {
@Override
public Long getValue() {
if (!Catalog.getInstance().isMaster()) {
@@ -280,7 +281,7 @@ public final class MetricRepo {
// max compaction score of tablets on each backends
GaugeMetric<Long> tabletMaxCompactionScore = (GaugeMetric<Long>)
new GaugeMetric<Long>(
- TABLET_MAX_COMPACTION_SCORE,
+ TABLET_MAX_COMPACTION_SCORE, MetricUnit.NUMBER,
"tablet max compaction score") {
@Override
public Long getValue() {
@@ -306,6 +307,7 @@ public final class MetricRepo {
JvmStats jvmStats = jvmService.stats();
visitor.visitJvm(sb, jvmStats);
+ visitor.setMetricNumber(PALO_METRIC_REGISTER.getPaloMetrics().size());
// doris metrics
for (Metric metric : PALO_METRIC_REGISTER.getPaloMetrics()) {
visitor.visit(sb, metric);
diff --git a/fe/src/main/java/org/apache/doris/metric/MetricVisitor.java
b/fe/src/main/java/org/apache/doris/metric/MetricVisitor.java
index 7abb998..681b6df 100644
--- a/fe/src/main/java/org/apache/doris/metric/MetricVisitor.java
+++ b/fe/src/main/java/org/apache/doris/metric/MetricVisitor.java
@@ -32,6 +32,8 @@ public abstract class MetricVisitor {
this.prefix = prefix;
}
+ public abstract void setMetricNumber(int metricNumber);
+
public abstract void visitJvm(StringBuilder sb, JvmStats jvmStats);
public abstract void visit(StringBuilder sb, Metric metric);
diff --git
a/fe/src/main/java/org/apache/doris/metric/PrometheusMetricVisitor.java
b/fe/src/main/java/org/apache/doris/metric/PrometheusMetricVisitor.java
index 6a98b94..d4a638d 100644
--- a/fe/src/main/java/org/apache/doris/metric/PrometheusMetricVisitor.java
+++ b/fe/src/main/java/org/apache/doris/metric/PrometheusMetricVisitor.java
@@ -52,11 +52,19 @@ public class PrometheusMetricVisitor extends MetricVisitor {
private static final String HELP = "# HELP ";
private static final String TYPE = "# TYPE ";
+ private int ordinal = 0;
+ private int metricNumber = 0;
+
public PrometheusMetricVisitor(String prefix) {
super(prefix);
}
@Override
+ public void setMetricNumber(int metricNumber) {
+ this.metricNumber = metricNumber;
+ }
+
+ @Override
public void visitJvm(StringBuilder sb, JvmStats jvmStats) {
// heap
sb.append(Joiner.on(" ").join(HELP, JVM_HEAP_SIZE_BYTES, "jvm heap
stat\n"));
diff --git
a/fe/src/main/java/org/apache/doris/metric/SimpleCoreMetricVisitor.java
b/fe/src/main/java/org/apache/doris/metric/SimpleCoreMetricVisitor.java
index cf832ad..a65f742 100644
--- a/fe/src/main/java/org/apache/doris/metric/SimpleCoreMetricVisitor.java
+++ b/fe/src/main/java/org/apache/doris/metric/SimpleCoreMetricVisitor.java
@@ -56,6 +56,9 @@ public class SimpleCoreMetricVisitor extends MetricVisitor {
public static final String MAX_TABLET_COMPACTION_SCORE =
"max_tablet_compaction_score";
+ private int ordinal = 0;
+ private int metricNumber = 0;
+
private static final Map<String, String> CORE_METRICS = Maps.newHashMap();
static {
CORE_METRICS.put(MAX_JOURMAL_ID, TYPE_LONG);
@@ -72,6 +75,11 @@ public class SimpleCoreMetricVisitor extends MetricVisitor {
}
@Override
+ public void setMetricNumber(int metricNumber) {
+ this.metricNumber = metricNumber;
+ }
+
+ @Override
public void visitJvm(StringBuilder sb, JvmStats jvmStats) {
Iterator<MemoryPool> memIter = jvmStats.getMem().iterator();
while (memIter.hasNext()) {
@@ -134,4 +142,4 @@ public class SimpleCoreMetricVisitor extends MetricVisitor {
sb.append(prefix + "_backend_dead_num").append("
").append(String.valueOf(beDeadNum)).append("\n");
sb.append(prefix + "_broker_dead_num").append("
").append(String.valueOf(brokerDeadNum)).append("\n");
}
-}
\ No newline at end of file
+}
diff --git a/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java
b/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index fcbf6f3..d94733a 100644
--- a/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -211,10 +211,10 @@ public class ConnectProcessor {
// TODO(cmy): when user send multi-statement, the executor is the last
statement's executor.
// We may need to find some way to resolve this.
if (executor != null) {
- auditAfterExec(originStmt.replace("\n", " \\n"),
executor.getParsedStmt(), executor.getQueryStatisticsForAuditLog());
+ auditAfterExec(originStmt.replace("\n", " "),
executor.getParsedStmt(), executor.getQueryStatisticsForAuditLog());
} else {
// executor can be null if we encounter analysis error.
- auditAfterExec(originStmt.replace("\n", " \\n"), null, null);
+ auditAfterExec(originStmt.replace("\n", " "), null, null);
}
}
diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 90133e2..b5ccb65 100644
--- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -56,6 +56,7 @@ import org.apache.doris.common.util.ProfileManager;
import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.metric.MetricRepo;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.mysql.MysqlEofPacket;
@@ -707,6 +708,7 @@ public class StmtExecutor {
TabletCommitInfo.fromThrift(coord.getCommitInfos()),
10000)) {
txnStatus = TransactionStatus.VISIBLE;
+ MetricRepo.COUNTER_LOAD_FINISHED.increase(1L);
} else {
txnStatus = TransactionStatus.COMMITTED;
}
diff --git a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 6677050..3ec7032 100644
--- a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -42,6 +42,7 @@ import org.apache.doris.load.EtlStatus;
import org.apache.doris.load.LoadJob;
import org.apache.doris.load.MiniEtlTaskInfo;
import org.apache.doris.master.MasterImpl;
+import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.planner.StreamLoadPlanner;
import org.apache.doris.plugin.AuditEvent;
@@ -696,6 +697,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
// begin
long timeoutSecond = request.isSetTimeout() ? request.getTimeout() :
Config.stream_load_default_timeout_second;
+ MetricRepo.COUNTER_LOAD_ADD.increase(1L);
return Catalog.getCurrentGlobalTransactionMgr().beginTransaction(
db.getId(), Lists.newArrayList(table.getId()),
request.getLabel(), request.getRequest_id(),
new TxnCoordinator(TxnSourceType.BE, clientIp),
@@ -757,10 +759,15 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
throw new UserException("unknown database, database=" + dbName);
}
- return
Catalog.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(
- db, request.getTxnId(),
- TabletCommitInfo.fromThrift(request.getCommitInfos()),
- 5000,
TxnCommitAttachment.fromThrift(request.txnCommitAttachment));
+ boolean ret =
Catalog.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(
+ db, request.getTxnId(),
+ TabletCommitInfo.fromThrift(request.getCommitInfos()),
+ 5000,
TxnCommitAttachment.fromThrift(request.txnCommitAttachment));
+ if (ret) {
+ // if commit and publish is success, load can be regarded as
success
+ MetricRepo.COUNTER_LOAD_FINISHED.increase(1L);
+ }
+ return ret;
}
@Override
diff --git a/fe/src/main/java/org/apache/doris/task/AgentTaskQueue.java
b/fe/src/main/java/org/apache/doris/task/AgentTaskQueue.java
index 36c0ef6..96aba8e 100644
--- a/fe/src/main/java/org/apache/doris/task/AgentTaskQueue.java
+++ b/fe/src/main/java/org/apache/doris/task/AgentTaskQueue.java
@@ -21,8 +21,10 @@ import org.apache.doris.thrift.TPushType;
import org.apache.doris.thrift.TTaskType;
import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
import com.google.common.collect.Table;
import org.apache.logging.log4j.LogManager;
@@ -215,6 +217,19 @@ public class AgentTaskQueue {
return taskNum;
}
+ public static synchronized Multimap<Long, Long>
getTabletIdsByType(TTaskType type) {
+ Multimap<Long, Long> tabletIds = HashMultimap.create();
+ Map<Long, Map<Long, AgentTask>> taskMap = tasks.column(type);
+ if (taskMap != null) {
+ for (Map<Long, AgentTask> signatureMap : taskMap.values()) {
+ for (AgentTask task : signatureMap.values()) {
+ tabletIds.put(task.getDbId(), task.getTabletId());
+ }
+ }
+ }
+ return tabletIds;
+ }
+
public static synchronized int getTaskNum(long backendId, TTaskType type,
boolean isFailed) {
int taskNum = 0;
if (backendId != -1) {
diff --git
a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
index da73c8c..9c7192a 100644
--- a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
+++ b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
@@ -35,6 +35,7 @@ import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.EtlStatus;
import org.apache.doris.load.Load;
import org.apache.doris.load.Source;
+import org.apache.doris.metric.MetricRepo;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.task.MasterTaskExecutor;
import org.apache.doris.transaction.TransactionState;
@@ -44,6 +45,8 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import java.util.List;
@@ -58,6 +61,11 @@ import mockit.Mocked;
public class BrokerLoadJobTest {
+ @BeforeClass
+ public static void start() {
+ MetricRepo.init();
+ }
+
@Test
public void testFromLoadStmt(@Injectable LoadStmt loadStmt,
@Injectable LabelName labelName,
diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
b/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
index d99a4c1..dd69932 100644
--- a/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
+++ b/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
@@ -39,6 +39,8 @@ import org.apache.doris.transaction.TransactionState;
import com.google.common.collect.Maps;
import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import java.util.Map;
@@ -48,6 +50,11 @@ import mockit.Mocked;
public class LoadJobTest {
+ @BeforeClass
+ public static void start() {
+ MetricRepo.init();
+ }
+
@Test
public void testGetDbNotExists(@Mocked Catalog catalog) {
LoadJob loadJob = new BrokerLoadJob();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]