Was there anything in the logs (ideally on debug)?
Have you debugged the execution and followed the counter() calls all the
way to the reporter?
Do you only see JobManager metrics, or is there somewhere also something
about the TaskManager?
I can see several issues with your code, but none that would fully
explain the issue:
a) your reporter is not thread-safe
b) you only differentiate metrics by name, which will lead to quite a
few collisions.
Be also aware that there will be 2 reporter instances; one for the JM
and one for the TM.
To remedy this, I would recommend creating a factory that returns a
static reporter instance instead; overall this tends to be cleaner.
Alternatively, when using the testing harnesses IIRC you can also set
set a custom MetricGroup implementation.
On 3/16/2021 4:13 AM, Rion Williams wrote:
Hi all,
Recently, I was working on adding some custom metrics to a Flink job
that required the use of dynamic labels (i.e. capturing various
counters that were "slicable" by things like tenant / source, etc.).
I ended up handling it in a very naive fashion that would just keep a
dictionary of metrics that had already been registered and update them
accordingly which looked something like this:
class MyCustomProcessFunction:ProcessFunction<Event,Unit>() {
private lateinit var metrics:CustomMetricsRegistry override fun
open(parameters:Configuration) {
metrics =CustomMetricsRegistry(runtimeContext.metricGroup)
}
override fun
processElement(event:Event,context:Context,collector:Collector<Unit>) {
// Insert calls like metrics.inc("tenant-name", 4) here }
}
class CustomMetricsRegistry(private val metricGroup:MetricGroup):Serializable {
// Increments a given metric by key fun
inc(metric:String,tenant:String,amount:Long =1) {
// Store a key for the metric val key ="$metric-$tenant" //
Store/register the metric if (!registeredMetrics.containsKey(key)){
registeredMetrics[key] =metricGroup .addGroup("tenant",tenant)
.counter(metric)
}
// Update the metric by a given amount
registeredMetrics[key]!!.inc(amount)
}
companion object {
private var registeredMetrics:HashMap<String,Counter> = hashMapOf()
}
}
Basically registering and updating new metrics for tenants as they are
encountered, which I've seen being emitted as expected via hitting the
appropriately configured metrics endpoint (using a PrometheusReporter).
However, while I was trying to write a few unit tests for this, I
seemed to encounter an issue. I was following a Stack Overflow post
that was answered by @Chesnay Schepler <mailto:ches...@apache.org> [0]
that described the use of an in-memory/embedded Flink cluster and a
custom reporter that would statically expose the underlying metrics.
So I took a shot at implementing something similar as follows:
*Flink Cluster Definition*
private val metricsConfiguration =Configuration.fromMap(mutableMapOf(
ConfigConstants.METRICS_REPORTER_PREFIX +
"MockCustomMetricsReporter." +
ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to
MockCustomMetricsReporter::class.java.name ))
@ClassRule @JvmField val flinkCluster =MiniClusterResource(
MiniClusterResourceConfiguration.Builder()
.setConfiguration(metricsConfiguration)
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(1)
.build()
)
*Custom Reporter*
class MockCustomMetricsReporter :MetricReporter {
override fun open(metricConfig:MetricConfig) {}
override fun close() {}
override fun
notifyOfAddedMetric(metric:Metric,name:String,metricGroup:MetricGroup) {
// Store the metrics that are being registered as we see them if
(!registeredCustomMetrics.containsKey(name)){
registeredCustomMetrics[name] =metric }
}
override fun
notifyOfRemovedMetric(metric:Metric,name:String,metricGroup:MetricGroup) {
// Do nothing here }
companion object {
// Static reference to metrics as they are registered var
registeredCustomMetrics =HashMap<String,Metric>()
}
}
*Example Test*
@Test fun `Example Metrics Use Case`(){
// Arrange val stream =StreamExecutionEnvironment.getExecutionEnvironment()
val events =listOf(
eventWithUsers("tenant1","us...@testing.com
<mailto:us...@testing.com>"),
eventWithUsers("tenant2","us...@testing.com
<mailto:us...@testing.com>"),
)
// Act stream
.fromCollection(events)
.process(MyCustomProcessFunction())
// Assert stream.execute()
assertTrue(MockCustomMetricsReporter.registeredCustomMetrics.size > 0)
}
While this test will pass, *the problem is that the custom metrics
defined dynamically (via the CustomMetricsRegistry implementation) do
not appear within the registeredCustomMetrics collection*. In fact,
there are 21 metrics that get registered but all of them appear to be
classic out-of-the-box metrics such as CPU usage, number of task
managers, load, various other Netty and JVM stats, but no custom
metrics are included.
I've tried multiple different configurations, implementations via a
custom TestHarness, etc. but for some reason the custom metrics being
defined are never triggering the notifyOfAddedMetric function which
would be responsible for adding them to the static collection to be
asserted against.
Any ideas / guidance would be more than welcome. Perhaps a different
approach? Based off examples I've encountered, the code seems like it
should "just work".
Thanks much,
Rion
[0] :
https://stackoverflow.com/questions/51675597/how-to-unitest-gauge-metrics-in-flink
<https://stackoverflow.com/questions/51675597/how-to-unitest-gauge-metrics-in-flink>