This is an automated email from the ASF dual-hosted git repository.
gianm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new bb60ca64c15 feat: Add kafka/consumer/pollIdleRatio metric. (#19366)
bb60ca64c15 is described below
commit bb60ca64c150949404d0288d3e809045dbf443d7
Author: Gian Merlino <[email protected]>
AuthorDate: Wed Apr 22 12:53:53 2026 -0700
feat: Add kafka/consumer/pollIdleRatio metric. (#19366)
The metric corresponds to the Kafka consumer metric "poll-idle-ratio-avg".
---
.../prometheus-emitter/src/main/resources/defaultMetrics.json | 1 +
.../java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java | 6 ++++++
.../org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java | 1 +
processing/src/main/resources/loggingEmitterAllowedMetrics.json | 1 +
processing/src/test/resources/loggingEmitterAllowedMetrics.json | 1 +
5 files changed, 10 insertions(+)
diff --git
a/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
b/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
index 247686a838b..bbb4d7b11a5 100644
---
a/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
+++
b/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
@@ -97,6 +97,7 @@
"kafka/consumer/recordsPerRequestAvg" : { "dimensions" : ["topic"], "type" :
"gauge", "help": "Average records per fetch request as seen by the consumer of
a Kafka indexing task (per topic)."},
"kafka/consumer/outgoingBytes" : { "dimensions" : ["node_id"], "type" :
"count", "help": "Bytes sent to Kafka brokers by the consumer of a Kafka
indexing task (per node)."},
"kafka/consumer/incomingBytes" : { "dimensions" : ["node_id"], "type" :
"count", "help": "Bytes received from Kafka brokers by the consumer of a Kafka
indexing task (per node)."},
+ "kafka/consumer/pollIdleRatio" : { "dimensions" : [], "type" : "gauge",
"help": "Average fraction of time the consumer of a Kafka indexing task spent
idle (not in poll). 0 means never idle, 1 means always idle."},
"ingest/count" : { "dimensions" : ["dataSource", "taskType"], "type" :
"count", "help": "Count of 1 every time an ingestion job runs (includes
compaction jobs). Aggregate using dimensions." },
"ingest/segments/count" : { "dimensions" : ["dataSource", "taskType"],
"type" : "count", "help": "Count of final segments created by job (includes
tombstones)." },
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java
index 88c89c04f1c..a24779ec61e 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java
@@ -127,6 +127,12 @@ public class KafkaConsumerMonitor extends AbstractMonitor
"kafka/consumer/incomingBytes",
Set.of(CLIENT_ID_TAG, NODE_ID_TAG),
KafkaConsumerMetric.MetricType.COUNTER
+ ),
+ new KafkaConsumerMetric(
+ POLL_IDLE_RATIO_METRIC_NAME,
+ "kafka/consumer/pollIdleRatio",
+ Set.of(CLIENT_ID_TAG),
+ KafkaConsumerMetric.MetricType.GAUGE
)
).collect(Collectors.toMap(KafkaConsumerMetric::getKafkaMetricName,
Function.identity()));
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
index fbd7b64d988..6b1b51474c7 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
@@ -484,6 +484,7 @@ public class KafkaRecordSupplierTest
emitter.verifyEmitted("kafka/consumer/recordsPerRequestAvg", 1);
emitter.verifyEmitted("kafka/consumer/incomingBytes", 2);
emitter.verifyEmitted("kafka/consumer/outgoingBytes", 2);
+ emitter.verifyEmitted("kafka/consumer/pollIdleRatio", 1);
recordSupplier.close();
Assert.assertFalse(monitor.monitor(emitter));
diff --git a/processing/src/main/resources/loggingEmitterAllowedMetrics.json
b/processing/src/main/resources/loggingEmitterAllowedMetrics.json
index c4647a459c1..6349c04b5dd 100644
--- a/processing/src/main/resources/loggingEmitterAllowedMetrics.json
+++ b/processing/src/main/resources/loggingEmitterAllowedMetrics.json
@@ -76,6 +76,7 @@
"kafka/consumer/fetchSizeMax": [],
"kafka/consumer/incomingBytes": [],
"kafka/consumer/outgoingBytes": [],
+ "kafka/consumer/pollIdleRatio": [],
"kafka/consumer/recordsConsumed": [],
"kafka/consumer/recordsLag": [],
"kafka/consumer/recordsPerRequestAvg": [],
diff --git a/processing/src/test/resources/loggingEmitterAllowedMetrics.json
b/processing/src/test/resources/loggingEmitterAllowedMetrics.json
index 9ec51afff2d..06ff68822b3 100644
--- a/processing/src/test/resources/loggingEmitterAllowedMetrics.json
+++ b/processing/src/test/resources/loggingEmitterAllowedMetrics.json
@@ -76,6 +76,7 @@
"kafka/consumer/fetchSizeMax": [],
"kafka/consumer/incomingBytes": [],
"kafka/consumer/outgoingBytes": [],
+ "kafka/consumer/pollIdleRatio": [],
"kafka/consumer/recordsConsumed": [],
"kafka/consumer/recordsLag": [],
"kafka/consumer/recordsPerRequestAvg": [],
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]