Hello Artem, Your observation is correct: for Streams as well as many other clients, MetricsReporter#metricChange() are primarily used for registering new metrics, this is because at the construction time of the client (hence when MetricsReporter#init() is called) those finer-grained metrics, like per-task / process, or per-destination-broker (for producer / consumer clients, e.g.) are not known yet so they have to be created later during runtime.
As for your second question: I'm not sure what do you mean by `metric labels`, and how they are modified. A `MetricName` contains a name / group-name, and a tags map, all of them should be fixed when being registered. Guozhang On Fri, Feb 8, 2019 at 3:22 PM Artem Oboturov <artem.obotu...@smarttra.de> wrote: > Hi > > I was checking out how to export metrics from a Kafka Steams App directly > to the Prometheus Registry without using JMX, just by implementing a custom > MetricsReporter. > > After some investigation, it became clear that a metric with the same name > could be used by multiple entities, e.g. with different client-id. So it > would be possible to differentiate them by that id. > > The MetricsReporter is configured for a Kafka Streams application. > > What I felt strange was that the MetricsReporter#init() is always called > with an empty list, so it is not possible to define Prometheus metrics > properly in advance and then keep them there to be updated with new values. > Hence the MetricsReporter#metricChange() is used to both make changes to > metered values and to define new metrics if they were not yet set up. Here > comes a second problem: labels are variable, i.e. they are modified after > they were first initialized, so that trying to set values, I often have the > constraint violation in Prometheus, because it requires the labels set to > be fixed. > > *ENV:* > Kafka Streams (Scala): 2.1.0-cp1 > Kafka: Confluent.cloud, GCP > Java: 8 > > java.lang.IllegalArgumentException: Incorrect number of labels. > at io.prometheus.client.SimpleCollector.labels(SimpleCollector.java:64) > ~[simpleclient-0.6.0.jar:?] > at > > privateimpl.KafkaStreamsPrometheusMetricsReporter.metricChange(KafkaStreamsPrometheusMetricsReporter.scala:82) > ~[classes/:?] > at org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:563) > [kafka-clients-2.1.0-cp1.jar:?] > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:278) > [kafka-clients-2.1.0-cp1.jar:?] > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:254) > [kafka-clients-2.1.0-cp1.jar:?] > at > > org.apache.kafka.clients.consumer.internals.Fetcher$FetchManagerMetrics.recordPartitionLead(Fetcher.java:1489) > [kafka-clients-2.1.0-cp1.jar:?] > at > > org.apache.kafka.clients.consumer.internals.Fetcher$FetchManagerMetrics.access$1600(Fetcher.java:1392) > [kafka-clients-2.1.0-cp1.jar:?] > at > > org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:557) > [kafka-clients-2.1.0-cp1.jar:?] > at > > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:505) > [kafka-clients-2.1.0-cp1.jar:?] > at > > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1256) > [kafka-clients-2.1.0-cp1.jar:?] > at > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1188) > [kafka-clients-2.1.0-cp1.jar:?] > at > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164) > [kafka-clients-2.1.0-cp1.jar:?] > at > > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:913) > [kafka-streams-2.1.0-cp1.jar:?] > at > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822) > [kafka-streams-2.1.0-cp1.jar:?] > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777) > [kafka-streams-2.1.0-cp1.jar:?] > at > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747) > [kafka-streams-2.1.0-cp1.jar:?] > > Test implementation of the MetricsReporter: > > package privateimpl > > import java.util > import java.util.concurrent.ConcurrentHashMap > > import com.typesafe.scalalogging.LazyLogging > import privateimpl.KafkaStreamsPrometheusMetricsReporter.MKey > import io.prometheus.client.{Collector, CollectorRegistry, Gauge} > import org.apache.kafka.common.MetricName > import org.apache.kafka.common.metrics.KafkaMetric > > import scala.util.{Failure, Success, Try} > > object KafkaStreamsPrometheusMetricsReporter { > type MKey = (String, String) > // type MKey = MetricName > > private def toKey(metric: KafkaMetric): MKey = { > val name = metric.metricName() > (name.name(), name.group()) > // name > } > > private def toPrometheusMetric(metric: KafkaMetric): Gauge = { > val name = metric.metricName() > val labels = name.tags().keySet().toArray(Array.empty[String]).map { > Collector.sanitizeMetricName > } > > Gauge > .build(Collector.sanitizeMetricName(name.name()), > Collector.sanitizeMetricName(name.description())) > .namespace(Collector.sanitizeMetricName(name.group())) > .help(s"Kafka description: ${name.description()}") > .labelNames(labels: _*) > .create() > } > } > > class KafkaStreamsPrometheusMetricsReporter extends > org.apache.kafka.common.metrics.MetricsReporter with LazyLogging { > > import KafkaStreamsPrometheusMetricsReporter._ > > private val registry = new CollectorRegistry(true) > private val metricsCache = new ConcurrentHashMap[MKey, Gauge] > > private def getCachedMetric(metric: KafkaMetric): Gauge = { > metricsCache.computeIfAbsent(toKey(metric), _ => { > val p = toPrometheusMetric(metric) > try { > p.register(registry) > } catch { > case ex: IllegalArgumentException => > println(ex) > throw ex > } > p > }) > } > > override def init(metrics: util.List[KafkaMetric]): Unit = { > metrics.forEach { m => > getCachedMetric(m) > > () > } > } > > override def metricChange(metric: KafkaMetric): Unit = { > val name = metric.metricName() > val collector = getCachedMetric(metric) > if (collector == null) { > logger.error("Kafka metric name was not registered: {}", name) > } else { > val lables = name.tags().values().toArray(Array.empty[String]) > > Try { > metric.metricValue() > } match { > case Success(value) => > value match { > case d: java.lang.Double => > collector.labels(lables: _*).set(d) > case v => > println(v) > } > case Failure(ex) => > logger.error("Failed to process {}", ex) > } > } > } > > override def metricRemoval(metric: KafkaMetric): Unit = () > > override def close(): Unit = () > > override def configure(configs: util.Map[String, _]): Unit = () > } > -- -- Guozhang