Hi all,

Recently, I was working on adding some custom metrics to a Flink job that
required the use of dynamic labels (i.e. capturing various counters that
were "slicable" by things like tenant / source, etc.).

I ended up handling it in a very naive fashion that would just keep a
dictionary of metrics that had already been registered and update them
accordingly which looked something like this:

class MyCustomProcessFunction: ProcessFunction<Event, Unit>() {
    private lateinit var metrics: CustomMetricsRegistry

    override fun open(parameters: Configuration) {
        metrics = CustomMetricsRegistry(runtimeContext.metricGroup)
    }

    override fun processElement(event: Event, context: Context,
collector: Collector<Unit>) {
        // Insert calls like metrics.inc("tenant-name", 4) here
    }
}

class CustomMetricsRegistry(private val metricGroup: MetricGroup):
Serializable {
    // Increments a given metric by key
    fun inc(metric: String, tenant: String, amount: Long = 1) {
        // Store a key for the metric
        val key = "$metric-$tenant"
        // Store/register the metric
        if (!registeredMetrics.containsKey(key)){
            registeredMetrics[key] = metricGroup
                .addGroup("tenant", tenant)
                .counter(metric)
        }

        // Update the metric by a given amount
        registeredMetrics[key]!!.inc(amount)
    }

    companion object {
        private var registeredMetrics: HashMap<String, Counter> = hashMapOf()
    }
}

Basically registering and updating new metrics for tenants as they are
encountered, which I've seen being emitted as expected via hitting the
appropriately configured metrics endpoint (using a PrometheusReporter).

However, while I was trying to write a few unit tests for this, I seemed to
encounter an issue. I was following a Stack Overflow post that was answered
by @Chesnay Schepler <ches...@apache.org> [0] that described the use of an
in-memory/embedded Flink cluster and a custom reporter that would
statically expose the underlying metrics.

So I took a shot at implementing something similar as follows:

*Flink Cluster Definition*

private val metricsConfiguration = Configuration.fromMap(mutableMapOf(
    ConfigConstants.METRICS_REPORTER_PREFIX +
    "MockCustomMetricsReporter." +
    ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to
MockCustomMetricsReporter::class.java.name
))

@ClassRule
@JvmField
val flinkCluster = MiniClusterResource(
    MiniClusterResourceConfiguration.Builder()
        .setConfiguration(metricsConfiguration)
        .setNumberTaskManagers(1)
        .setNumberSlotsPerTaskManager(1)
        .build()
)

*Custom Reporter*

class MockCustomMetricsReporter : MetricReporter {

    override fun open(metricConfig: MetricConfig) {}
    override fun close() {}
    override fun notifyOfAddedMetric(metric: Metric, name: String,
metricGroup: MetricGroup) {
        // Store the metrics that are being registered as we see them
        if (!registeredCustomMetrics.containsKey(name)){
            registeredCustomMetrics[name] = metric
        }
    }

    override fun notifyOfRemovedMetric(metric: Metric, name: String,
metricGroup: MetricGroup) {
        // Do nothing here
    }

    companion object {
        // Static reference to metrics as they are registered
        var registeredCustomMetrics = HashMap<String, Metric>()
    }
}

*Example Test*

@Test
fun `Example Metrics Use Case`(){
    // Arrange
    val stream = StreamExecutionEnvironment.getExecutionEnvironment()
    val events = listOf(
        eventWithUsers("tenant1", "us...@testing.com"),
        eventWithUsers("tenant2", "us...@testing.com"),
    )

    // Act
    stream
        .fromCollection(events)
        .process(MyCustomProcessFunction())

    // Assert
    stream.execute()
    assertTrue(MockCustomMetricsReporter.registeredCustomMetrics.size > 0)
}

While this test will pass, *the problem is that the custom metrics defined
dynamically (via the CustomMetricsRegistry implementation) do not appear
within the registeredCustomMetrics collection*. In fact, there are 21
metrics that get registered but all of them appear to be classic
out-of-the-box metrics such as CPU usage, number of task managers, load,
various other Netty and JVM stats, but no custom metrics are included.

I've tried multiple different configurations, implementations via a custom
TestHarness, etc. but for some reason the custom metrics being defined are
never triggering the notifyOfAddedMetric function which would be
responsible for adding them to the static collection to be asserted
against.

Any ideas / guidance would be more than welcome. Perhaps a different
approach? Based off examples I've encountered, the code seems like it
should "just work".

Thanks much,

Rion

[0] :
https://stackoverflow.com/questions/51675597/how-to-unitest-gauge-metrics-in-flink

Reply via email to