Hi

I was checking out how to export metrics from a Kafka Steams App directly
to the Prometheus Registry without using JMX, just by implementing a custom
MetricsReporter.

After some investigation, it became clear that a metric with the same name
could be used by multiple entities, e.g. with different client-id. So it
would be possible to differentiate them by that id.

The MetricsReporter is configured for a Kafka Streams application.

What I felt strange was that the MetricsReporter#init() is always called
with an empty list, so it is not possible to define Prometheus metrics
properly in advance and then keep them there to be updated with new values.
Hence the MetricsReporter#metricChange() is used to both make changes to
metered values and to define new metrics if they were not yet set up. Here
comes a second problem: labels are variable, i.e. they are modified after
they were first initialized, so that trying to set values, I often have the
constraint violation in Prometheus, because it requires the labels set to
be fixed.

*ENV:*
Kafka Streams (Scala): 2.1.0-cp1
Kafka: Confluent.cloud, GCP
Java: 8

java.lang.IllegalArgumentException: Incorrect number of labels.
at io.prometheus.client.SimpleCollector.labels(SimpleCollector.java:64)
~[simpleclient-0.6.0.jar:?]
at
privateimpl.KafkaStreamsPrometheusMetricsReporter.metricChange(KafkaStreamsPrometheusMetricsReporter.scala:82)
~[classes/:?]
at org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:563)
[kafka-clients-2.1.0-cp1.jar:?]
at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:278)
[kafka-clients-2.1.0-cp1.jar:?]
at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:254)
[kafka-clients-2.1.0-cp1.jar:?]
at
org.apache.kafka.clients.consumer.internals.Fetcher$FetchManagerMetrics.recordPartitionLead(Fetcher.java:1489)
[kafka-clients-2.1.0-cp1.jar:?]
at
org.apache.kafka.clients.consumer.internals.Fetcher$FetchManagerMetrics.access$1600(Fetcher.java:1392)
[kafka-clients-2.1.0-cp1.jar:?]
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:557)
[kafka-clients-2.1.0-cp1.jar:?]
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:505)
[kafka-clients-2.1.0-cp1.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1256)
[kafka-clients-2.1.0-cp1.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1188)
[kafka-clients-2.1.0-cp1.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
[kafka-clients-2.1.0-cp1.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:913)
[kafka-streams-2.1.0-cp1.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
[kafka-streams-2.1.0-cp1.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
[kafka-streams-2.1.0-cp1.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
[kafka-streams-2.1.0-cp1.jar:?]

Test implementation of the MetricsReporter:

package privateimpl

import java.util
import java.util.concurrent.ConcurrentHashMap

import com.typesafe.scalalogging.LazyLogging
import privateimpl.KafkaStreamsPrometheusMetricsReporter.MKey
import io.prometheus.client.{Collector, CollectorRegistry, Gauge}
import org.apache.kafka.common.MetricName
import org.apache.kafka.common.metrics.KafkaMetric

import scala.util.{Failure, Success, Try}

object KafkaStreamsPrometheusMetricsReporter {
    type MKey = (String, String)
//  type MKey = MetricName

  private def toKey(metric: KafkaMetric): MKey = {
    val name = metric.metricName()
    (name.name(), name.group())
//    name
  }

  private def toPrometheusMetric(metric: KafkaMetric): Gauge = {
    val name = metric.metricName()
    val labels = name.tags().keySet().toArray(Array.empty[String]).map {
      Collector.sanitizeMetricName
    }

    Gauge
      .build(Collector.sanitizeMetricName(name.name()),
Collector.sanitizeMetricName(name.description()))
      .namespace(Collector.sanitizeMetricName(name.group()))
      .help(s"Kafka description: ${name.description()}")
      .labelNames(labels: _*)
      .create()
  }
}

class KafkaStreamsPrometheusMetricsReporter extends
org.apache.kafka.common.metrics.MetricsReporter with LazyLogging {

  import KafkaStreamsPrometheusMetricsReporter._

  private val registry = new CollectorRegistry(true)
  private val metricsCache = new ConcurrentHashMap[MKey, Gauge]

  private def getCachedMetric(metric: KafkaMetric): Gauge = {
    metricsCache.computeIfAbsent(toKey(metric), _ => {
      val p = toPrometheusMetric(metric)
      try {
        p.register(registry)
      } catch {
        case ex: IllegalArgumentException =>
          println(ex)
          throw ex
      }
      p
    })
  }

  override def init(metrics: util.List[KafkaMetric]): Unit = {
    metrics.forEach { m =>
      getCachedMetric(m)

      ()
    }
  }

  override def metricChange(metric: KafkaMetric): Unit = {
    val name = metric.metricName()
    val collector = getCachedMetric(metric)
    if (collector == null) {
      logger.error("Kafka metric name was not registered: {}", name)
    } else {
      val lables = name.tags().values().toArray(Array.empty[String])

      Try {
        metric.metricValue()
      } match {
        case Success(value) =>
          value match {
            case d: java.lang.Double =>
              collector.labels(lables: _*).set(d)
            case v =>
              println(v)
          }
        case Failure(ex) =>
          logger.error("Failed to process {}", ex)
      }
    }
  }

  override def metricRemoval(metric: KafkaMetric): Unit = ()

  override def close(): Unit = ()

  override def configure(configs: util.Map[String, _]): Unit = ()
}


Best regards,
Artem Oboturov

Reply via email to