This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 9691db7918 [Enhancement](metrics) add more metrics (#11693) 9691db7918 is described below commit 9691db791807419e9cd017680e286c6fae65a68d Author: ccoffline <45881148+ccoffl...@users.noreply.github.com> AuthorDate: Wed Oct 26 08:31:03 2022 +0800 [Enhancement](metrics) add more metrics (#11693) * Add `AutoMappedMetric` to measure dynamic object. * Add query instance and rpc metrics * Add thrift rpc metrics * Add txn metrics * Reorganize metrics init routine. Co-authored-by: 迟成 <chich...@meituan.com> --- .../maint-monitor/monitor-metrics/metrics.md | 13 +- .../org/apache/doris/common/ThreadPoolManager.java | 53 +++++--- .../org/apache/doris/metric/AutoMappedMetric.java | 37 ++++++ .../java/org/apache/doris/metric/MetricRepo.java | 144 +++++++++++++++------ .../org/apache/doris/planner/OlapTableSink.java | 11 ++ .../java/org/apache/doris/qe/ConnectProcessor.java | 1 + .../main/java/org/apache/doris/qe/Coordinator.java | 3 + .../java/org/apache/doris/qe/QeProcessorImpl.java | 2 + .../org/apache/doris/rpc/BackendServiceProxy.java | 3 + .../java/org/apache/doris/service/FeServer.java | 18 ++- .../doris/transaction/DatabaseTransactionMgr.java | 39 +++++- .../doris/transaction/GlobalTransactionMgr.java | 28 ++++ .../doris/transaction/PublishVersionDaemon.java | 5 + .../apache/doris/transaction/TransactionState.java | 9 ++ .../apache/doris/common/ThreadPoolManagerTest.java | 7 - 15 files changed, 303 insertions(+), 70 deletions(-) diff --git a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md index 12996ef240..206f021e2e 100644 --- a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md +++ b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md @@ -96,6 +96,7 @@ curl http://be_host:webserver_port/metrics?type=json |`doris_fe_query_err`| | Num | 错误查询的累积值 | | |`doris_fe_query_err_rate`| | Num/Sec| 每秒错误查询数 | 观察集群是否出现查询错误 | P0 | |`doris_fe_query_latency_ms`| | 毫秒| 查询请求延迟的百分位统计。如 {quantile="0.75"} 表示 75 分位的查询延迟 | 详细观察各分位查询延迟 | P0 | +|| | 毫秒| 各个DB的查询请求延迟的百分位统计。如 {quantile="0.75",db="test"} 表示DB test 75 分位的查询延迟 | 详细观察各DB各分位查询延迟 | P0 | |`doris_fe_query_olap_table`| | Num| 查询内部表(OlapTable)的请求个数统计 | | |`doris_fe_query_total`| | Num | 所有查询请求的累积计数 | | |`doris_fe_report_queue_size`| | Num | BE的各种定期汇报任务在FE端的队列长度 | 该值反映了汇报任务在 Master FE 节点上的阻塞程度,数值越大,表示FE处理能力不足 | P0| @@ -131,7 +132,17 @@ curl http://be_host:webserver_port/metrics?type=json || {type="reject"} | Num| 被拒绝的事务数量。(如当前运行事务数大于阈值,则新的事务会被拒绝)| | || {type="succes"} | Num| 成功的事务数量| | |`doris_fe_txn_status`| | Num | 统计当前处于各个状态的导入事务的数量。如 {type="committed"} 表示处于 committed 状态的事务的数量 | 可以观测各个状态下导入事务的数量,来判断是否有堆积 | P0 | -|`doris_fe_max_instances_num_per_user`|| Num| 当前连接用户中,发起fragment instance最多的用户的 instance 数目 |该数值可以用于观测当前是否有用户占用过多查询资源| P0 | +|`doris_fe_query_instance_num`|| Num| 指定用户当前正在请求的fragment instance数目。如 {user="test_u"} 表示用户 test_u 当前正在请求的 instance 数目 |该数值可以用于观测指定用户是否占用过多查询资源| P0 | +|`doris_fe_query_instance_begin`|| Num| 指定用户请求开始的fragment instance数目。如 {user="test_u"} 表示用户 test_u 开始请求的 instance 数目 |该数值可以用于观测指定用户是否提交了过多查询| P0 | +|`doris_fe_query_rpc_total`|| Num| 发往指定BE的RPC次数。如 {be="192.168.10.1"} 表示发往ip为 192.168.10.1 的BE的RPC次数 |该数值可以观测是否向某个BE提交了过多RPC| | +|`doris_fe_query_rpc_failed`|| Num| 发往指定BE的RPC失败次数。如 {be="192.168.10.1"} 表示发往ip为 192.168.10.1 的BE的RPC失败次数 |该数值可以观测某个BE是否存在RPC问题| | +|`doris_fe_query_rpc_size`|| Num| 指定BE的RPC数据大小。如 {be="192.168.10.1"} 表示发往ip为 192.168.10.1 的BE的RPC数据字节数 |该数值可以观测是否向某个BE提交了过大的RPC| | +|`doris_fe_txn_exec_latency_ms`| | 毫秒| 事务执行耗时的百分位统计。如 {quantile="0.75"} 表示 75 分位的事务执行耗时 | 详细观察各分位事务执行耗时 | P0 | +|`doris_fe_txn_publish_latency_ms`| | 毫秒| 事务publish耗时的百分位统计。如 {quantile="0.75"} 表示 75 分位的事务publish耗时 | 详细观察各分位事务publish耗时 | P0 | +|`doris_fe_txn_num`|| Num| 指定DB正在执行的事务数。如 {db="test"} 表示DB test 当前正在执行的事务数 |该数值可以观测某个DB是否提交了大量事务| P0 | +|`doris_fe_txn_replica_num`|| Num| 指定DB正在执行的事务打开的副本数。如 {db="test"} 表示DB test 当前正在执行的事务打开的副本数 |该数值可以观测某个DB是否打开了过多的副本,可能会影响其他事务执行| P0 | +|`doris_fe_thrift_rpc_total`|| Num| FE thrift接口各个方法接收的RPC请求次数。如 {method="report"} 表示 report 方法接收的RPC请求次数 |该数值可以观测某个thrift rpc方法的负载| | +|`doris_fe_thrift_rpc_latency_ms`|| 毫秒| FE thrift接口各个方法接收的RPC请求耗时。如 {method="report"} 表示 report 方法接收的RPC请求耗时 |该数值可以观测某个thrift rpc方法的负载| | ### JVM 监控 diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java index 96e4b114d9..cbd3312b65 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java @@ -17,7 +17,7 @@ package org.apache.doris.common; -import org.apache.doris.metric.GaugeMetric; +import org.apache.doris.metric.Metric; import org.apache.doris.metric.Metric.MetricUnit; import org.apache.doris.metric.MetricLabel; import org.apache.doris.metric.MetricRepo; @@ -37,6 +37,8 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; /** * ThreadPoolManager is a helper class for construct daemon thread pool with limit thread and memory resource. @@ -75,29 +77,35 @@ public class ThreadPoolManager { } public static void registerThreadPoolMetric(String poolName, ThreadPoolExecutor threadPool) { - for (String poolMetricType : poolMetricTypes) { - GaugeMetric<Integer> gauge = new GaugeMetric<Integer>( - "thread_pool", MetricUnit.NOUNIT, "thread_pool statistics") { - @Override - public Integer getValue() { - String metricType = this.getLabels().get(1).getValue(); - switch (metricType) { - case "pool_size": - return threadPool.getPoolSize(); - case "active_thread_num": - return threadPool.getActiveCount(); - case "task_in_queue": - return threadPool.getQueue().size(); - default: - return 0; - } - } - }; - gauge.addLabel(new MetricLabel("name", poolName)).addLabel(new MetricLabel("type", poolMetricType)); - MetricRepo.DORIS_METRIC_REGISTER.addMetrics(gauge); + Metric.MetricType gauge = Metric.MetricType.GAUGE; + Metric.MetricType counter = Metric.MetricType.COUNTER; + MetricUnit nounit = MetricUnit.NOUNIT; + registerMetric(poolName, "pool_size", gauge, nounit, threadPool::getPoolSize); + registerMetric(poolName, "active_thread_num", gauge, nounit, threadPool::getActiveCount); + registerMetric(poolName, "active_thread_pct", gauge, MetricUnit.PERCENT, + () -> 1.0 * threadPool.getActiveCount() / threadPool.getMaximumPoolSize()); + registerMetric(poolName, "task_in_queue", gauge, nounit, () -> threadPool.getQueue().size()); + registerMetric(poolName, "task_count", counter, nounit, threadPool::getTaskCount); + registerMetric(poolName, "completed_task_count", counter, nounit, threadPool::getCompletedTaskCount); + RejectedExecutionHandler rejectedHandler = threadPool.getRejectedExecutionHandler(); + if (rejectedHandler instanceof LogDiscardPolicy) { + registerMetric(poolName, "task_rejected", counter, nounit, + ((LogDiscardPolicy) rejectedHandler).rejectedNum::get); } } + private static <T> void registerMetric(String poolName, String metricName, + Metric.MetricType type, MetricUnit unit, Supplier<T> supplier) { + Metric<T> gauge = new Metric<T>("thread_pool", type, unit, "thread_pool statistics") { + @Override + public T getValue() { + return supplier.get(); + } + }; + gauge.addLabel(new MetricLabel("name", poolName)).addLabel(new MetricLabel("type", metricName)); + MetricRepo.DORIS_METRIC_REGISTER.addMetrics(gauge); + } + public static ThreadPoolExecutor newDaemonCacheThreadPool(int maxNumThread, String poolName, boolean needRegisterMetric) { return newDaemonThreadPool(0, maxNumThread, KEEP_ALIVE_TIME, @@ -165,14 +173,17 @@ public class ThreadPoolManager { private static final Logger LOG = LogManager.getLogger(LogDiscardPolicy.class); private String threadPoolName; + private AtomicLong rejectedNum; public LogDiscardPolicy(String threadPoolName) { this.threadPoolName = threadPoolName; + this.rejectedNum = new AtomicLong(0); } @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { LOG.warn("Task " + r.toString() + " rejected from " + threadPoolName + " " + executor.toString()); + this.rejectedNum.incrementAndGet(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/AutoMappedMetric.java b/fe/fe-core/src/main/java/org/apache/doris/metric/AutoMappedMetric.java new file mode 100644 index 0000000000..17b7e1a104 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/AutoMappedMetric.java @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.metric; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; + +public class AutoMappedMetric<M> { + + private final Map<String, M> nameToMetric = new ConcurrentHashMap<>(); + private final Function<String, M> metricSupplier; + + public AutoMappedMetric(Function<String, M> metricSupplier) { + this.metricSupplier = metricSupplier; + } + + public M getOrAdd(String name) { + return nameToMetric.computeIfAbsent(name, metricSupplier); + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java index c12fb11d75..9991c651d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java @@ -51,7 +51,7 @@ import java.util.Set; import java.util.SortedMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.function.BinaryOperator; +import java.util.function.Supplier; public final class MetricRepo { private static final Logger LOG = LogManager.getLogger(MetricRepo.class); @@ -71,6 +71,13 @@ public final class MetricRepo { public static LongCounterMetric COUNTER_QUERY_ERR; public static LongCounterMetric COUNTER_QUERY_TABLE; public static LongCounterMetric COUNTER_QUERY_OLAP_TABLE; + public static Histogram HISTO_QUERY_LATENCY; + public static AutoMappedMetric<Histogram> DB_HISTO_QUERY_LATENCY; + public static AutoMappedMetric<GaugeMetricImpl<Long>> USER_GAUGE_QUERY_INSTANCE_NUM; + public static AutoMappedMetric<LongCounterMetric> USER_COUNTER_QUERY_INSTANCE_BEGIN; + public static AutoMappedMetric<LongCounterMetric> BE_COUNTER_QUERY_RPC_ALL; + public static AutoMappedMetric<LongCounterMetric> BE_COUNTER_QUERY_RPC_FAILED; + public static AutoMappedMetric<LongCounterMetric> BE_COUNTER_QUERY_RPC_SIZE; public static LongCounterMetric COUNTER_CACHE_ADDED_SQL; public static LongCounterMetric COUNTER_CACHE_ADDED_PARTITION; @@ -80,27 +87,33 @@ public final class MetricRepo { public static LongCounterMetric COUNTER_EDIT_LOG_WRITE; public static LongCounterMetric COUNTER_EDIT_LOG_READ; public static LongCounterMetric COUNTER_EDIT_LOG_SIZE_BYTES; + public static LongCounterMetric COUNTER_EDIT_LOG_CLEAN_SUCCESS; + public static LongCounterMetric COUNTER_EDIT_LOG_CLEAN_FAILED; + public static Histogram HISTO_EDIT_LOG_WRITE_LATENCY; + public static LongCounterMetric COUNTER_IMAGE_WRITE_SUCCESS; public static LongCounterMetric COUNTER_IMAGE_WRITE_FAILED; public static LongCounterMetric COUNTER_IMAGE_PUSH_SUCCESS; public static LongCounterMetric COUNTER_IMAGE_PUSH_FAILED; public static LongCounterMetric COUNTER_IMAGE_CLEAN_SUCCESS; public static LongCounterMetric COUNTER_IMAGE_CLEAN_FAILED; - public static LongCounterMetric COUNTER_EDIT_LOG_CLEAN_SUCCESS; - public static LongCounterMetric COUNTER_EDIT_LOG_CLEAN_FAILED; public static LongCounterMetric COUNTER_TXN_REJECT; public static LongCounterMetric COUNTER_TXN_BEGIN; public static LongCounterMetric COUNTER_TXN_FAILED; public static LongCounterMetric COUNTER_TXN_SUCCESS; + public static Histogram HISTO_TXN_EXEC_LATENCY; + public static Histogram HISTO_TXN_PUBLISH_LATENCY; + public static AutoMappedMetric<GaugeMetricImpl<Long>> DB_GAUGE_TXN_NUM; + public static AutoMappedMetric<GaugeMetricImpl<Long>> DB_GAUGE_TXN_REPLICA_NUM; public static LongCounterMetric COUNTER_ROUTINE_LOAD_ROWS; public static LongCounterMetric COUNTER_ROUTINE_LOAD_RECEIVED_BYTES; public static LongCounterMetric COUNTER_ROUTINE_LOAD_ERROR_ROWS; public static LongCounterMetric COUNTER_HIT_SQL_BLOCK_RULE; - public static Histogram HISTO_QUERY_LATENCY; - public static Histogram HISTO_EDIT_LOG_WRITE_LATENCY; + public static AutoMappedMetric<LongCounterMetric> THRIFT_COUNTER_RPC_ALL; + public static AutoMappedMetric<LongCounterMetric> THRIFT_COUNTER_RPC_LATENCY; // following metrics will be updated by metric calculator public static GaugeMetricImpl<Double> GAUGE_QUERY_PER_SECOND; @@ -118,7 +131,6 @@ public final class MetricRepo { return; } - // 1. gauge // load jobs LoadManager loadManger = Env.getCurrentEnv().getLoadManager(); for (EtlJobType jobType : EtlJobType.values()) { @@ -228,21 +240,6 @@ public final class MetricRepo { }; DORIS_METRIC_REGISTER.addMetrics(scheduledTabletNum); - GaugeMetric<Long> maxInstanceNum = new GaugeMetric<Long>("max_instances_num_per_user", - MetricUnit.NOUNIT, "max instances num of all current users") { - @Override - public Long getValue() { - try { - return ((QeProcessorImpl) QeProcessorImpl.INSTANCE).getInstancesNumPerUser().values().stream() - .reduce(-1, BinaryOperator.maxBy(Integer::compareTo)).longValue(); - } catch (Throwable ex) { - LOG.warn("Get max_instances_num_per_user error", ex); - return -2L; - } - } - }; - DORIS_METRIC_REGISTER.addMetrics(maxInstanceNum); - // txn status for (TransactionStatus status : TransactionStatus.values()) { GaugeMetric<Long> gauge = new GaugeMetric<Long>("txn_status", MetricUnit.NOUNIT, "txn statistics") { @@ -274,19 +271,53 @@ public final class MetricRepo { DORIS_METRIC_REGISTER.addMetrics(GAUGE_MAX_TABLET_COMPACTION_SCORE); GAUGE_MAX_TABLET_COMPACTION_SCORE.setValue(0L); - // 2. counter + // query COUNTER_REQUEST_ALL = new LongCounterMetric("request_total", MetricUnit.REQUESTS, "total request"); DORIS_METRIC_REGISTER.addMetrics(COUNTER_REQUEST_ALL); COUNTER_QUERY_ALL = new LongCounterMetric("query_total", MetricUnit.REQUESTS, "total query"); DORIS_METRIC_REGISTER.addMetrics(COUNTER_QUERY_ALL); COUNTER_QUERY_ERR = new LongCounterMetric("query_err", MetricUnit.REQUESTS, "total error query"); DORIS_METRIC_REGISTER.addMetrics(COUNTER_QUERY_ERR); - COUNTER_QUERY_TABLE = new LongCounterMetric("query_table", MetricUnit.REQUESTS, "total query from table"); DORIS_METRIC_REGISTER.addMetrics(COUNTER_QUERY_TABLE); COUNTER_QUERY_OLAP_TABLE = new LongCounterMetric("query_olap_table", MetricUnit.REQUESTS, "total query from olap table"); DORIS_METRIC_REGISTER.addMetrics(COUNTER_QUERY_OLAP_TABLE); + HISTO_QUERY_LATENCY = METRIC_REGISTER.histogram( + MetricRegistry.name("query", "latency", "ms")); + DB_HISTO_QUERY_LATENCY = new AutoMappedMetric<>(name -> { + String metricName = MetricRegistry.name("query", "latency", "ms", "db=" + name); + return METRIC_REGISTER.histogram(metricName); + }); + USER_COUNTER_QUERY_INSTANCE_BEGIN = addLabeledMetrics("user", () -> + new LongCounterMetric("query_instance_begin", MetricUnit.NOUNIT, + "number of query instance begin")); + USER_GAUGE_QUERY_INSTANCE_NUM = addLabeledMetrics("user", () -> + new GaugeMetricImpl<>("query_instance_num", MetricUnit.NOUNIT, + "number of running query instances of current user")); + GaugeMetric<Long> queryInstanceNum = new GaugeMetric<Long>("query_instance_num", + MetricUnit.NOUNIT, "number of query instances of all current users") { + @Override + public Long getValue() { + QeProcessorImpl qe = ((QeProcessorImpl) QeProcessorImpl.INSTANCE); + long totalInstanceNum = 0; + for (Map.Entry<String, Integer> e : qe.getInstancesNumPerUser().entrySet()) { + long value = e.getValue() == null ? 0L : e.getValue().longValue(); + totalInstanceNum += value; + USER_GAUGE_QUERY_INSTANCE_NUM.getOrAdd(e.getKey()).setValue(value); + } + return totalInstanceNum; + } + }; + DORIS_METRIC_REGISTER.addMetrics(queryInstanceNum); + BE_COUNTER_QUERY_RPC_ALL = addLabeledMetrics("be", () -> + new LongCounterMetric("query_rpc_total", MetricUnit.NOUNIT, "")); + BE_COUNTER_QUERY_RPC_FAILED = addLabeledMetrics("be", () -> + new LongCounterMetric("query_rpc_failed", MetricUnit.NOUNIT, "")); + BE_COUNTER_QUERY_RPC_SIZE = addLabeledMetrics("be", () -> + new LongCounterMetric("query_rpc_size", MetricUnit.BYTES, "")); + + // cache COUNTER_CACHE_ADDED_SQL = new LongCounterMetric("cache_added", MetricUnit.REQUESTS, "Number of SQL mode cache added"); COUNTER_CACHE_ADDED_SQL.addLabel(new MetricLabel("type", "sql")); @@ -304,6 +335,7 @@ public final class MetricRepo { COUNTER_CACHE_HIT_PARTITION.addLabel(new MetricLabel("type", "partition")); DORIS_METRIC_REGISTER.addMetrics(COUNTER_CACHE_HIT_PARTITION); + // edit log COUNTER_EDIT_LOG_WRITE = new LongCounterMetric("edit_log", MetricUnit.OPERATIONS, "counter of edit log write into bdbje"); COUNTER_EDIT_LOG_WRITE.addLabel(new MetricLabel("type", "write")); @@ -315,6 +347,18 @@ public final class MetricRepo { COUNTER_EDIT_LOG_SIZE_BYTES = new LongCounterMetric("edit_log", MetricUnit.BYTES, "size of edit log"); COUNTER_EDIT_LOG_SIZE_BYTES.addLabel(new MetricLabel("type", "bytes")); DORIS_METRIC_REGISTER.addMetrics(COUNTER_EDIT_LOG_SIZE_BYTES); + HISTO_EDIT_LOG_WRITE_LATENCY = METRIC_REGISTER.histogram( + MetricRegistry.name("editlog", "write", "latency", "ms")); + + // edit log clean + COUNTER_EDIT_LOG_CLEAN_SUCCESS = new LongCounterMetric("edit_log_clean", MetricUnit.OPERATIONS, + "counter of edit log succeed in cleaning"); + COUNTER_EDIT_LOG_CLEAN_SUCCESS.addLabel(new MetricLabel("type", "success")); + DORIS_METRIC_REGISTER.addMetrics(COUNTER_EDIT_LOG_CLEAN_SUCCESS); + COUNTER_EDIT_LOG_CLEAN_FAILED = new LongCounterMetric("edit_log_clean", MetricUnit.OPERATIONS, + "counter of edit log failed to clean"); + COUNTER_EDIT_LOG_CLEAN_FAILED.addLabel(new MetricLabel("type", "failed")); + DORIS_METRIC_REGISTER.addMetrics(COUNTER_EDIT_LOG_CLEAN_FAILED); // image generate COUNTER_IMAGE_WRITE_SUCCESS = new LongCounterMetric("image_write", MetricUnit.OPERATIONS, @@ -345,16 +389,7 @@ public final class MetricRepo { COUNTER_IMAGE_CLEAN_FAILED.addLabel(new MetricLabel("type", "failed")); DORIS_METRIC_REGISTER.addMetrics(COUNTER_IMAGE_CLEAN_FAILED); - // edit log clean - COUNTER_EDIT_LOG_CLEAN_SUCCESS = new LongCounterMetric("edit_log_clean", MetricUnit.OPERATIONS, - "counter of edit log succeed in cleaning"); - COUNTER_EDIT_LOG_CLEAN_SUCCESS.addLabel(new MetricLabel("type", "success")); - DORIS_METRIC_REGISTER.addMetrics(COUNTER_EDIT_LOG_CLEAN_SUCCESS); - COUNTER_EDIT_LOG_CLEAN_FAILED = new LongCounterMetric("edit_log_clean", MetricUnit.OPERATIONS, - "counter of edit log failed to clean"); - COUNTER_EDIT_LOG_CLEAN_FAILED.addLabel(new MetricLabel("type", "failed")); - DORIS_METRIC_REGISTER.addMetrics(COUNTER_EDIT_LOG_CLEAN_FAILED); - + // txn COUNTER_TXN_REJECT = new LongCounterMetric("txn_counter", MetricUnit.REQUESTS, "counter of rejected transactions"); COUNTER_TXN_REJECT.addLabel(new MetricLabel("type", "reject")); @@ -371,6 +406,30 @@ public final class MetricRepo { "counter of failed transactions"); COUNTER_TXN_FAILED.addLabel(new MetricLabel("type", "failed")); DORIS_METRIC_REGISTER.addMetrics(COUNTER_TXN_FAILED); + HISTO_TXN_EXEC_LATENCY = METRIC_REGISTER.histogram( + MetricRegistry.name("txn", "exec", "latency", "ms")); + HISTO_TXN_PUBLISH_LATENCY = METRIC_REGISTER.histogram( + MetricRegistry.name("txn", "publish", "latency", "ms")); + GaugeMetric<Long> txnNum = new GaugeMetric<Long>("txn_num", MetricUnit.NOUNIT, + "number of running transactions") { + @Override + public Long getValue() { + return Env.getCurrentGlobalTransactionMgr().getAllRunningTxnNum(); + } + }; + DORIS_METRIC_REGISTER.addMetrics(txnNum); + DB_GAUGE_TXN_NUM = addLabeledMetrics("db", () -> + new GaugeMetricImpl<>("txn_num", MetricUnit.NOUNIT, "number of running transactions")); + GaugeMetric<Long> txnReplicaNum = new GaugeMetric<Long>("txn_replica_num", MetricUnit.NOUNIT, + "number of writing tablets in all running transactions") { + @Override + public Long getValue() { + return Env.getCurrentGlobalTransactionMgr().getAllRunningTxnReplicaNum(); + } + }; + DORIS_METRIC_REGISTER.addMetrics(txnReplicaNum); + DB_GAUGE_TXN_REPLICA_NUM = addLabeledMetrics("db", () -> new GaugeMetricImpl<>("txn_replica_num", + MetricUnit.NOUNIT, "number of writing tablets in all running transactions")); COUNTER_ROUTINE_LOAD_ROWS = new LongCounterMetric("routine_load_rows", MetricUnit.ROWS, "total rows of routine load"); @@ -385,11 +444,11 @@ public final class MetricRepo { COUNTER_HIT_SQL_BLOCK_RULE = new LongCounterMetric("counter_hit_sql_block_rule", MetricUnit.ROWS, "total hit sql block rule query"); DORIS_METRIC_REGISTER.addMetrics(COUNTER_HIT_SQL_BLOCK_RULE); - // 3. histogram - HISTO_QUERY_LATENCY = METRIC_REGISTER.histogram( - MetricRegistry.name("query", "latency", "ms")); - HISTO_EDIT_LOG_WRITE_LATENCY = METRIC_REGISTER.histogram( - MetricRegistry.name("editlog", "write", "latency", "ms")); + + THRIFT_COUNTER_RPC_ALL = addLabeledMetrics("method", () -> + new LongCounterMetric("thrift_rpc_total", MetricUnit.NOUNIT, "")); + THRIFT_COUNTER_RPC_LATENCY = addLabeledMetrics("method", () -> + new LongCounterMetric("thrift_rpc_latency_ms", MetricUnit.MILLISECONDS, "")); // init system metrics initSystemMetrics(); @@ -587,6 +646,15 @@ public final class MetricRepo { return sb.toString(); } + public static <M extends Metric<?>> AutoMappedMetric<M> addLabeledMetrics(String label, Supplier<M> metric) { + return new AutoMappedMetric<>(value -> { + M m = metric.get(); + m.addLabel(new MetricLabel(label, value)); + MetricRepo.DORIS_METRIC_REGISTER.addMetrics(m); + return m; + }); + } + // update some metrics to make a ready to be visited private static void updateMetrics() { SYSTEM_METRICS.update(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index ef470cf711..5a2eddb17a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -62,6 +62,7 @@ import org.apache.doris.thrift.TPaloNodesInfo; import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TTabletLocation; import org.apache.doris.thrift.TUniqueId; +import org.apache.doris.transaction.DatabaseTransactionMgr; import com.google.common.base.Preconditions; import com.google.common.collect.HashMultimap; @@ -346,6 +347,7 @@ public class OlapTableSink extends DataSink { TOlapTableLocationParam slaveLocationParam = new TOlapTableLocationParam(); // BE id -> path hash Multimap<Long, Long> allBePathsMap = HashMultimap.create(); + int replicaNum = 0; for (Long partitionId : partitionIds) { Partition partition = table.getPartition(partitionId); int quorum = table.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum() / 2 + 1; @@ -375,6 +377,7 @@ public class OlapTableSink extends DataSink { Lists.newArrayList(bePathsMap.keySet()))); } allBePathsMap.putAll(bePathsMap); + replicaNum += bePathsMap.size(); } } } @@ -385,6 +388,14 @@ public class OlapTableSink extends DataSink { if (!st.ok()) { throw new DdlException(st.getErrorMsg()); } + long dbId = tDataSink.getOlapTableSink().getDbId(); + long txnId = tDataSink.getOlapTableSink().getTxnId(); + try { + DatabaseTransactionMgr mgr = Env.getCurrentGlobalTransactionMgr().getDatabaseTransactionMgr(dbId); + mgr.registerTxnReplicas(txnId, replicaNum); + } catch (Exception e) { + LOG.error("register txn replica failed, txnId={}, dbId={}", txnId, dbId); + } return Arrays.asList(locationParam, slaveLocationParam); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index f424adf7b1..847bb5baf0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -192,6 +192,7 @@ public class ConnectProcessor { } else if (ctx.getState().getStateType() == MysqlStateType.OK) { // ok query MetricRepo.HISTO_QUERY_LATENCY.update(elapseMs); + MetricRepo.DB_HISTO_QUERY_LATENCY.getOrAdd(ctx.getDatabase()).update(elapseMs); if (elapseMs > Config.qe_slow_log_ms) { String sqlDigest = DigestUtils.md5Hex(((Queriable) parsedStmt).toDigest()); ctx.getAuditEventBuilder().setSqlDigest(sqlDigest); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 13809d5633..7ac58111aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -38,6 +38,7 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.util.VectorizedUtil; import org.apache.doris.load.LoadErrorHub; import org.apache.doris.load.loadv2.LoadJob; +import org.apache.doris.metric.MetricRepo; import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.DataSink; import org.apache.doris.planner.DataStreamSink; @@ -724,8 +725,10 @@ public class Coordinator { cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR); switch (code) { case TIMEOUT: + MetricRepo.BE_COUNTER_QUERY_RPC_FAILED.getOrAdd(pair.first.brpcAddr.hostname).increase(1L); throw new RpcException(pair.first.brpcAddr.hostname, errMsg, exception); case THRIFT_RPC_ERROR: + MetricRepo.BE_COUNTER_QUERY_RPC_FAILED.getOrAdd(pair.first.brpcAddr.hostname).increase(1L); SimpleScheduler.addToBlacklist(pair.first.beId, errMsg); throw new RpcException(pair.first.brpcAddr.hostname, errMsg, exception); default: diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index fbf86d4347..1bf11ece97 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -22,6 +22,7 @@ import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.ProfileWriter; +import org.apache.doris.metric.MetricRepo; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TReportExecStatusParams; import org.apache.doris.thrift.TReportExecStatusResult; @@ -113,6 +114,7 @@ public final class QeProcessorImpl implements QeProcessor { } queryToInstancesNum.put(queryId, instancesNum); userToInstancesCount.computeIfAbsent(user, ignored -> new AtomicInteger(0)).addAndGet(instancesNum); + MetricRepo.USER_COUNTER_QUERY_INSTANCE_BEGIN.getOrAdd(user).increase(instancesNum.longValue()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index 7316782251..c58defa488 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -18,6 +18,7 @@ package org.apache.doris.rpc; import org.apache.doris.common.Config; +import org.apache.doris.metric.MetricRepo; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.InternalService.PExecPlanFragmentStartRequest; import org.apache.doris.proto.Types; @@ -122,6 +123,8 @@ public class BackendServiceProxy { builder.setVersion(InternalService.PFragmentRequestVersion.VERSION_2); final InternalService.PExecPlanFragmentRequest pRequest = builder.build(); + MetricRepo.BE_COUNTER_QUERY_RPC_ALL.getOrAdd(address.hostname).increase(1L); + MetricRepo.BE_COUNTER_QUERY_RPC_SIZE.getOrAdd(address.hostname).increase((long) pRequest.getSerializedSize()); try { final BackendServiceClient client = getProxy(address); if (twoPhaseExecution) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FeServer.java b/fe/fe-core/src/main/java/org/apache/doris/service/FeServer.java index 0c64799a3c..dd7b203833 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FeServer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FeServer.java @@ -18,6 +18,7 @@ package org.apache.doris.service; import org.apache.doris.common.ThriftServer; +import org.apache.doris.metric.MetricRepo; import org.apache.doris.thrift.FrontendService; import org.apache.logging.log4j.LogManager; @@ -25,6 +26,7 @@ import org.apache.logging.log4j.Logger; import org.apache.thrift.TProcessor; import java.io.IOException; +import java.lang.reflect.Proxy; /** * Doris frontend thrift server @@ -40,9 +42,21 @@ public class FeServer { } public void start() throws IOException { + FrontendServiceImpl service = new FrontendServiceImpl(ExecuteEnv.getInstance()); + FrontendService.Iface instance = (FrontendService.Iface) Proxy.newProxyInstance( + FrontendServiceImpl.class.getClassLoader(), + FrontendServiceImpl.class.getInterfaces(), + (proxy, method, args) -> { + long begin = System.currentTimeMillis(); + String name = method.getName(); + MetricRepo.THRIFT_COUNTER_RPC_ALL.getOrAdd(name).increase(1L); + Object r = method.invoke(service, args); + long end = System.currentTimeMillis(); + MetricRepo.THRIFT_COUNTER_RPC_LATENCY.getOrAdd(name).increase(end - begin); + return r; + }); // setup frontend server - TProcessor tprocessor = new FrontendService.Processor<FrontendService.Iface>( - new FrontendServiceImpl(ExecuteEnv.getInstance())); + TProcessor tprocessor = new FrontendService.Processor<>(instance); server = new ThriftServer(port, tprocessor); server.start(); LOG.info("thrift server started."); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 843df334ee..85f0a32892 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -128,6 +128,7 @@ public class DatabaseTransactionMgr { // count the number of running txns of database, except for the routine load txn private volatile int runningTxnNums = 0; + private volatile int runningTxnReplicaNums = 0; // count only the number of running routine load txns of database private volatile int runningRoutineLoadTxnNums = 0; @@ -984,7 +985,11 @@ public class DatabaseTransactionMgr { return; } // update transaction state version - transactionState.setCommitTime(System.currentTimeMillis()); + long commitTime = System.currentTimeMillis(); + transactionState.setCommitTime(commitTime); + if (MetricRepo.isInit) { + MetricRepo.HISTO_TXN_EXEC_LATENCY.update(commitTime - transactionState.getPrepareTime()); + } transactionState.setTransactionStatus(TransactionStatus.COMMITTED); transactionState.setErrorReplicas(errorReplicaIds); for (long tableId : tableToPartition.keySet()) { @@ -1095,6 +1100,38 @@ public class DatabaseTransactionMgr { updateTxnLabels(transactionState); } + public void registerTxnReplicas(long txnId, int replicaNum) throws UserException { + writeLock(); + try { + TransactionState transactionState = idToRunningTransactionState.get(txnId); + if (transactionState == null) { + throw new UserException("running transaction not found, txnId=" + txnId); + } + transactionState.setReplicaNum(replicaNum); + runningTxnReplicaNums += replicaNum; + } finally { + writeUnlock(); + } + } + + public int getRunningTxnNum() { + readLock(); + try { + return runningTxnNums; + } finally { + readUnlock(); + } + } + + public int getRunningTxnReplicaNum() { + readLock(); + try { + return runningTxnReplicaNums; + } finally { + readUnlock(); + } + } + private void updateTxnLabels(TransactionState transactionState) { Set<Long> txnIds = labelToTxnIds.get(transactionState.getLabel()); if (txnIds == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 4eae5c817a..dccdd5903c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -30,6 +30,7 @@ import org.apache.doris.common.QuotaExceedException; import org.apache.doris.common.UserException; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.MetaLockUtils; +import org.apache.doris.metric.MetricRepo; import org.apache.doris.persist.BatchRemoveTransactionsOperation; import org.apache.doris.persist.EditLog; import org.apache.doris.thrift.TStatus; @@ -640,4 +641,31 @@ public class GlobalTransactionMgr implements Writable { } throw new TimeoutException("Operation is timeout"); } + + public long getAllRunningTxnNum() { + long total = 0; + for (DatabaseTransactionMgr mgr : dbIdToDatabaseTransactionMgrs.values()) { + long num = mgr.getRunningTxnNum(); + total += num; + Database db = Env.getCurrentInternalCatalog().getDbNullable(mgr.getDbId()); + if (db != null) { + MetricRepo.DB_GAUGE_TXN_NUM.getOrAdd(db.getFullName()).setValue(num); + } + } + return total; + } + + public long getAllRunningTxnReplicaNum() { + long total = 0; + for (DatabaseTransactionMgr mgr : dbIdToDatabaseTransactionMgrs.values()) { + long num = mgr.getRunningTxnReplicaNum(); + total += num; + Database db = Env.getCurrentInternalCatalog().getDbNullable(mgr.getDbId()); + if (db != null) { + MetricRepo.DB_GAUGE_TXN_REPLICA_NUM.getOrAdd(db.getFullName()).setValue(num); + } + } + return total; + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java index e3ec3019ea..c1d894cf56 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java @@ -28,6 +28,7 @@ import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.common.Config; import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.metric.MetricRepo; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.AgentTaskQueue; @@ -255,6 +256,10 @@ public class PublishVersionDaemon extends MasterDaemon { for (PublishVersionTask task : transactionState.getPublishVersionTasks().values()) { AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature()); } + if (MetricRepo.isInit) { + long publishTime = transactionState.getPublishVersionTime() - transactionState.getCommitTime(); + MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime); + } } } // end for readyTransactionStates } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java index 3a59fac9d9..d15f070e6a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -171,6 +171,7 @@ public class TransactionState implements Writable { private long dbId; private List<Long> tableIdList; + private int replicaNum = 0; private long transactionId; private String label; // requestId is used to judge whether a begin request is a internal retry request. @@ -493,6 +494,14 @@ public class TransactionState implements Writable { return tableIdList; } + public int getReplicaNum() { + return replicaNum; + } + + public void setReplicaNum(int replicaNum) { + this.replicaNum = replicaNum; + } + public Map<Long, TableCommitInfo> getIdToTableCommitInfos() { return idToTableCommitInfos; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/ThreadPoolManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/ThreadPoolManagerTest.java index 15c62fb191..2ed1ddd67e 100755 --- a/fe/fe-core/src/test/java/org/apache/doris/common/ThreadPoolManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/ThreadPoolManagerTest.java @@ -17,13 +17,9 @@ package org.apache.doris.common; -import org.apache.doris.metric.Metric; -import org.apache.doris.metric.MetricRepo; - import org.junit.Assert; import org.junit.Test; -import java.util.List; import java.util.concurrent.ThreadPoolExecutor; public class ThreadPoolManagerTest { @@ -37,9 +33,6 @@ public class ThreadPoolManagerTest { ThreadPoolManager.registerThreadPoolMetric("test_cache_pool", testCachedPool); ThreadPoolManager.registerThreadPoolMetric("test_fixed_thread_pool", testFixedThreaddPool); - List<Metric> metricList = MetricRepo.getMetricsByName("thread_pool"); - - Assert.assertEquals(6, metricList.size()); Assert.assertEquals(ThreadPoolManager.LogDiscardPolicy.class, testCachedPool.getRejectedExecutionHandler().getClass()); Assert.assertEquals(ThreadPoolManager.BlockedPolicy.class, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org