Hello Artem,

Your observation is correct: for Streams as well as many other clients,
MetricsReporter#metricChange() are primarily used for registering new
metrics, this is because at the construction time of the client (hence when
MetricsReporter#init() is called) those finer-grained metrics, like
per-task / process, or per-destination-broker (for producer / consumer
clients, e.g.) are not known yet so they have to be created later during
runtime.

As for your second question: I'm not sure what do you mean by `metric
labels`, and how they are modified. A `MetricName` contains a name /
group-name, and a tags map, all of them should be fixed when being
registered.


Guozhang

On Fri, Feb 8, 2019 at 3:22 PM Artem Oboturov <artem.obotu...@smarttra.de>
wrote:

> Hi
>
> I was checking out how to export metrics from a Kafka Steams App directly
> to the Prometheus Registry without using JMX, just by implementing a custom
> MetricsReporter.
>
> After some investigation, it became clear that a metric with the same name
> could be used by multiple entities, e.g. with different client-id. So it
> would be possible to differentiate them by that id.
>
> The MetricsReporter is configured for a Kafka Streams application.
>
> What I felt strange was that the MetricsReporter#init() is always called
> with an empty list, so it is not possible to define Prometheus metrics
> properly in advance and then keep them there to be updated with new values.
> Hence the MetricsReporter#metricChange() is used to both make changes to
> metered values and to define new metrics if they were not yet set up. Here
> comes a second problem: labels are variable, i.e. they are modified after
> they were first initialized, so that trying to set values, I often have the
> constraint violation in Prometheus, because it requires the labels set to
> be fixed.
>
> *ENV:*
> Kafka Streams (Scala): 2.1.0-cp1
> Kafka: Confluent.cloud, GCP
> Java: 8
>
> java.lang.IllegalArgumentException: Incorrect number of labels.
> at io.prometheus.client.SimpleCollector.labels(SimpleCollector.java:64)
> ~[simpleclient-0.6.0.jar:?]
> at
>
> privateimpl.KafkaStreamsPrometheusMetricsReporter.metricChange(KafkaStreamsPrometheusMetricsReporter.scala:82)
> ~[classes/:?]
> at org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:563)
> [kafka-clients-2.1.0-cp1.jar:?]
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:278)
> [kafka-clients-2.1.0-cp1.jar:?]
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:254)
> [kafka-clients-2.1.0-cp1.jar:?]
> at
>
> org.apache.kafka.clients.consumer.internals.Fetcher$FetchManagerMetrics.recordPartitionLead(Fetcher.java:1489)
> [kafka-clients-2.1.0-cp1.jar:?]
> at
>
> org.apache.kafka.clients.consumer.internals.Fetcher$FetchManagerMetrics.access$1600(Fetcher.java:1392)
> [kafka-clients-2.1.0-cp1.jar:?]
> at
>
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:557)
> [kafka-clients-2.1.0-cp1.jar:?]
> at
>
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:505)
> [kafka-clients-2.1.0-cp1.jar:?]
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1256)
> [kafka-clients-2.1.0-cp1.jar:?]
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1188)
> [kafka-clients-2.1.0-cp1.jar:?]
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
> [kafka-clients-2.1.0-cp1.jar:?]
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:913)
> [kafka-streams-2.1.0-cp1.jar:?]
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
> [kafka-streams-2.1.0-cp1.jar:?]
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
> [kafka-streams-2.1.0-cp1.jar:?]
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
> [kafka-streams-2.1.0-cp1.jar:?]
>
> Test implementation of the MetricsReporter:
>
> package privateimpl
>
> import java.util
> import java.util.concurrent.ConcurrentHashMap
>
> import com.typesafe.scalalogging.LazyLogging
> import privateimpl.KafkaStreamsPrometheusMetricsReporter.MKey
> import io.prometheus.client.{Collector, CollectorRegistry, Gauge}
> import org.apache.kafka.common.MetricName
> import org.apache.kafka.common.metrics.KafkaMetric
>
> import scala.util.{Failure, Success, Try}
>
> object KafkaStreamsPrometheusMetricsReporter {
>     type MKey = (String, String)
> //  type MKey = MetricName
>
>   private def toKey(metric: KafkaMetric): MKey = {
>     val name = metric.metricName()
>     (name.name(), name.group())
> //    name
>   }
>
>   private def toPrometheusMetric(metric: KafkaMetric): Gauge = {
>     val name = metric.metricName()
>     val labels = name.tags().keySet().toArray(Array.empty[String]).map {
>       Collector.sanitizeMetricName
>     }
>
>     Gauge
>       .build(Collector.sanitizeMetricName(name.name()),
> Collector.sanitizeMetricName(name.description()))
>       .namespace(Collector.sanitizeMetricName(name.group()))
>       .help(s"Kafka description: ${name.description()}")
>       .labelNames(labels: _*)
>       .create()
>   }
> }
>
> class KafkaStreamsPrometheusMetricsReporter extends
> org.apache.kafka.common.metrics.MetricsReporter with LazyLogging {
>
>   import KafkaStreamsPrometheusMetricsReporter._
>
>   private val registry = new CollectorRegistry(true)
>   private val metricsCache = new ConcurrentHashMap[MKey, Gauge]
>
>   private def getCachedMetric(metric: KafkaMetric): Gauge = {
>     metricsCache.computeIfAbsent(toKey(metric), _ => {
>       val p = toPrometheusMetric(metric)
>       try {
>         p.register(registry)
>       } catch {
>         case ex: IllegalArgumentException =>
>           println(ex)
>           throw ex
>       }
>       p
>     })
>   }
>
>   override def init(metrics: util.List[KafkaMetric]): Unit = {
>     metrics.forEach { m =>
>       getCachedMetric(m)
>
>       ()
>     }
>   }
>
>   override def metricChange(metric: KafkaMetric): Unit = {
>     val name = metric.metricName()
>     val collector = getCachedMetric(metric)
>     if (collector == null) {
>       logger.error("Kafka metric name was not registered: {}", name)
>     } else {
>       val lables = name.tags().values().toArray(Array.empty[String])
>
>       Try {
>         metric.metricValue()
>       } match {
>         case Success(value) =>
>           value match {
>             case d: java.lang.Double =>
>               collector.labels(lables: _*).set(d)
>             case v =>
>               println(v)
>           }
>         case Failure(ex) =>
>           logger.error("Failed to process {}", ex)
>       }
>     }
>   }
>
>   override def metricRemoval(metric: KafkaMetric): Unit = ()
>
>   override def close(): Unit = ()
>
>   override def configure(configs: util.Map[String, _]): Unit = ()
> }
>


-- 
-- Guozhang

Reply via email to