Hi I was checking out how to export metrics from a Kafka Steams App directly to the Prometheus Registry without using JMX, just by implementing a custom MetricsReporter.
After some investigation, it became clear that a metric with the same name could be used by multiple entities, e.g. with different client-id. So it would be possible to differentiate them by that id. The MetricsReporter is configured for a Kafka Streams application. What I felt strange was that the MetricsReporter#init() is always called with an empty list, so it is not possible to define Prometheus metrics properly in advance and then keep them there to be updated with new values. Hence the MetricsReporter#metricChange() is used to both make changes to metered values and to define new metrics if they were not yet set up. Here comes a second problem: labels are variable, i.e. they are modified after they were first initialized, so that trying to set values, I often have the constraint violation in Prometheus, because it requires the labels set to be fixed. *ENV:* Kafka Streams (Scala): 2.1.0-cp1 Kafka: Confluent.cloud, GCP Java: 8 java.lang.IllegalArgumentException: Incorrect number of labels. at io.prometheus.client.SimpleCollector.labels(SimpleCollector.java:64) ~[simpleclient-0.6.0.jar:?] at privateimpl.KafkaStreamsPrometheusMetricsReporter.metricChange(KafkaStreamsPrometheusMetricsReporter.scala:82) ~[classes/:?] at org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:563) [kafka-clients-2.1.0-cp1.jar:?] at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:278) [kafka-clients-2.1.0-cp1.jar:?] at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:254) [kafka-clients-2.1.0-cp1.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher$FetchManagerMetrics.recordPartitionLead(Fetcher.java:1489) [kafka-clients-2.1.0-cp1.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher$FetchManagerMetrics.access$1600(Fetcher.java:1392) [kafka-clients-2.1.0-cp1.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:557) [kafka-clients-2.1.0-cp1.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:505) [kafka-clients-2.1.0-cp1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1256) [kafka-clients-2.1.0-cp1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1188) [kafka-clients-2.1.0-cp1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164) [kafka-clients-2.1.0-cp1.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:913) [kafka-streams-2.1.0-cp1.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822) [kafka-streams-2.1.0-cp1.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777) [kafka-streams-2.1.0-cp1.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747) [kafka-streams-2.1.0-cp1.jar:?] Test implementation of the MetricsReporter: package privateimpl import java.util import java.util.concurrent.ConcurrentHashMap import com.typesafe.scalalogging.LazyLogging import privateimpl.KafkaStreamsPrometheusMetricsReporter.MKey import io.prometheus.client.{Collector, CollectorRegistry, Gauge} import org.apache.kafka.common.MetricName import org.apache.kafka.common.metrics.KafkaMetric import scala.util.{Failure, Success, Try} object KafkaStreamsPrometheusMetricsReporter { type MKey = (String, String) // type MKey = MetricName private def toKey(metric: KafkaMetric): MKey = { val name = metric.metricName() (name.name(), name.group()) // name } private def toPrometheusMetric(metric: KafkaMetric): Gauge = { val name = metric.metricName() val labels = name.tags().keySet().toArray(Array.empty[String]).map { Collector.sanitizeMetricName } Gauge .build(Collector.sanitizeMetricName(name.name()), Collector.sanitizeMetricName(name.description())) .namespace(Collector.sanitizeMetricName(name.group())) .help(s"Kafka description: ${name.description()}") .labelNames(labels: _*) .create() } } class KafkaStreamsPrometheusMetricsReporter extends org.apache.kafka.common.metrics.MetricsReporter with LazyLogging { import KafkaStreamsPrometheusMetricsReporter._ private val registry = new CollectorRegistry(true) private val metricsCache = new ConcurrentHashMap[MKey, Gauge] private def getCachedMetric(metric: KafkaMetric): Gauge = { metricsCache.computeIfAbsent(toKey(metric), _ => { val p = toPrometheusMetric(metric) try { p.register(registry) } catch { case ex: IllegalArgumentException => println(ex) throw ex } p }) } override def init(metrics: util.List[KafkaMetric]): Unit = { metrics.forEach { m => getCachedMetric(m) () } } override def metricChange(metric: KafkaMetric): Unit = { val name = metric.metricName() val collector = getCachedMetric(metric) if (collector == null) { logger.error("Kafka metric name was not registered: {}", name) } else { val lables = name.tags().values().toArray(Array.empty[String]) Try { metric.metricValue() } match { case Success(value) => value match { case d: java.lang.Double => collector.labels(lables: _*).set(d) case v => println(v) } case Failure(ex) => logger.error("Failed to process {}", ex) } } } override def metricRemoval(metric: KafkaMetric): Unit = () override def close(): Unit = () override def configure(configs: util.Map[String, _]): Unit = () } Best regards, Artem Oboturov