Are you actually running a job, or are you using a harness for testing your function?

On 3/16/2021 3:24 PM, Rion Williams wrote:
Hi Chesnay,

Thanks for the prompt response and feedback, it's very much appreciated. Please see the inline responses below to your questions:

    *Was there anything in the logs (ideally on debug)?*


I didn't see anything within the logs that seemed to indicate anything out of the ordinary. I'm currently using a MiniClusterResources for this and attempted to set the logging levels to pick up everything (i.e. ALL), but if there's a way to expose more, I'm not aware of it.

    *Have you debugged the execution and followed the counter() calls
    all the way to the reporter?*


With the debugger, I traced one of the counter initializations and it seems that no reporters were being found within the register call in the MetricsRegistryImpl (i.e. this.reporters has no registered reporters):
if (this.reporters !=null) {
     for(int i =0; i <this.reporters.size(); ++i) {
         MetricRegistryImpl.ReporterAndSettings reporterAndSettings = 
(MetricRegistryImpl.ReporterAndSettings)this.reporters.get(i);

         try {
             if (reporterAndSettings !=null) {
                 FrontMetricGroup front =new 
FrontMetricGroup(reporterAndSettings.getSettings(),group);
                 
reporterAndSettings.getReporter().notifyOfAddedMetric(metric,metricName, front);
             }
         }catch (Exception var11) {
             LOG.warn("Error while registering metric: {}.",metricName,var11);
         }
     }
}
 Perhaps this is an error on my part as I had assumed the following would be sufficient to register my reporter (within a local / minicluster environment):
private val metricsConfiguration =Configuration.fromMap(mutableMapOf(
     ConfigConstants.METRICS_REPORTER_PREFIX +
     "MockCustomMetricsReporter." +
     ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to 
MockCustomMetricsReporter::class.java.name ))

@ClassRule @JvmField val flink =MiniClusterResource(
     MiniClusterResourceConfiguration.Builder()
         .setConfiguration(metricsConfiguration)
         .setNumberTaskManagers(1)
         .setNumberSlotsPerTaskManager(1)
         .build()
)
However, it's clearly being recognized for the built-in metrics, just not these custom ones that are being registered as they are triggering the notifyOfAddedMetric() function within the reporter itself.

    *Do you only see JobManager metrics, or is there somewhere also
    something about the TaskManager?*


It looks like there are metrics coming from both the JobManager and TaskManagers from the following examples that were coming out:
localhost.jobmanager.numRegisteredTaskManagers
.taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.Shuffle.Netty.UsedMemorySegments
.taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.JVM.Memory.Metaspace.Committed
localhost.jobmanager.Status.JVM.Memory.Direct.Count
I do agree that a factory implementation with a static reporter would likely be a better approach, so I may explore that a bit more. As well as adding some changes to the existing, albeit ghetto, implementation for handling the dynamic metrics. I did see several references to a MetricRegistry class, however I wasn't sure if that was the most appropriate place to add this type of functionality or if it was needed at all.

Thanks much,

Rion



On Tue, Mar 16, 2021 at 4:45 AM Chesnay Schepler <ches...@apache.org <mailto:ches...@apache.org>> wrote:

    Was there anything in the logs (ideally on debug)?
    Have you debugged the execution and followed the counter() calls
    all the way to the reporter?
    Do you only see JobManager metrics, or is there somewhere also
    something about the TaskManager?

    I can see several issues with your code, but none that would fully
    explain the issue:

    a) your reporter is not thread-safe
    b) you only differentiate metrics by name, which will lead to
    quite a few collisions.

    Be also aware that there will be 2 reporter instances; one for the
    JM and one for the TM.
    To remedy this, I would recommend creating a factory that returns
    a static reporter instance instead; overall this tends to be cleaner.

    Alternatively, when using the testing harnesses IIRC you can also
    set set a custom MetricGroup implementation.

    On 3/16/2021 4:13 AM, Rion Williams wrote:
    Hi all,

    Recently, I was working on adding some custom metrics to a Flink
    job that required the use of dynamic labels (i.e. capturing
    various counters that were "slicable" by things like tenant /
    source, etc.).

    I ended up handling it in a very naive fashion that would just
    keep a dictionary of metrics that had already been registered and
    update them accordingly which looked something like this:
    class MyCustomProcessFunction:ProcessFunction<Event,Unit>() {
         private lateinit var metrics:CustomMetricsRegistry override fun 
open(parameters:Configuration) {
             metrics =CustomMetricsRegistry(runtimeContext.metricGroup)
         }

         override fun 
processElement(event:Event,context:Context,collector:Collector<Unit>) {
             // Insert calls like metrics.inc("tenant-name", 4) here }
    }

    class CustomMetricsRegistry(private val 
metricGroup:MetricGroup):Serializable {
         // Increments a given metric by key fun 
inc(metric:String,tenant:String,amount:Long =1) {
             // Store a key for the metric val key ="$metric-$tenant" // 
Store/register the metric if (!registeredMetrics.containsKey(key)){
                 registeredMetrics[key] =metricGroup .addGroup("tenant",tenant)
                     .counter(metric)
             }

             // Update the metric by a given amount 
registeredMetrics[key]!!.inc(amount)
         }

         companion object {
             private var registeredMetrics:HashMap<String,Counter> = hashMapOf()
         }
    }
    Basically registering and updating new metrics for tenants as
    they are encountered, which I've seen being emitted as expected
    via hitting the appropriately configured metrics endpoint (using
    a PrometheusReporter).

    However, while I was trying to write a few unit tests for this, I
    seemed to encounter an issue. I was following a Stack Overflow
    post that was answered by @Chesnay Schepler
    <mailto:ches...@apache.org> [0] that described the use of an
    in-memory/embedded Flink cluster and a custom reporter that would
    statically expose the underlying metrics.

    So I took a shot at implementing something similar as follows:

    *Flink Cluster Definition*
    private val metricsConfiguration =Configuration.fromMap(mutableMapOf(
         ConfigConstants.METRICS_REPORTER_PREFIX +
         "MockCustomMetricsReporter." +
         ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to 
MockCustomMetricsReporter::class.java.name ))

    @ClassRule @JvmField val flinkCluster =MiniClusterResource(
         MiniClusterResourceConfiguration.Builder()
             .setConfiguration(metricsConfiguration)
             .setNumberTaskManagers(1)
             .setNumberSlotsPerTaskManager(1)
             .build()
    )
    *Custom Reporter*
    class MockCustomMetricsReporter :MetricReporter {

         override fun open(metricConfig:MetricConfig) {}
         override fun close() {}
         override fun 
notifyOfAddedMetric(metric:Metric,name:String,metricGroup:MetricGroup) {
             // Store the metrics that are being registered as we see them if 
(!registeredCustomMetrics.containsKey(name)){
                 registeredCustomMetrics[name] =metric }
         }

         override fun 
notifyOfRemovedMetric(metric:Metric,name:String,metricGroup:MetricGroup) {
             // Do nothing here }

         companion object {
             // Static reference to metrics as they are registered var
    registeredCustomMetrics =HashMap<String,Metric>()
         }
    }
    *Example Test*
    @Test fun `Example Metrics Use Case`(){
         // Arrange val stream 
=StreamExecutionEnvironment.getExecutionEnvironment()
         val events =listOf(
             eventWithUsers("tenant1","us...@testing.com 
<mailto:us...@testing.com>"),
             eventWithUsers("tenant2","us...@testing.com 
<mailto:us...@testing.com>"),
         )

         // Act stream
             .fromCollection(events)
             .process(MyCustomProcessFunction())

         // Assert stream.execute()
         assertTrue(MockCustomMetricsReporter.registeredCustomMetrics.size > 0)
    }
    While this test will pass, *the problem is that the custom
    metrics defined dynamically (via the CustomMetricsRegistry
    implementation) do not appear within the registeredCustomMetrics
    collection*. In fact, there are 21 metrics that get registered
    but all of them appear to be classic out-of-the-box metrics such
    as CPU usage, number of task managers, load, various other Netty
    and JVM stats, but no custom metrics are included.

    I've tried multiple different configurations, implementations via
    a custom TestHarness, etc. but for some reason the custom metrics
    being defined are never triggering the notifyOfAddedMetric
    function which would be responsible for adding them to the static
    collection to be asserted against.

    Any ideas / guidance would be more than welcome. Perhaps a
    different approach? Based off examples I've encountered, the code
    seems like it should "just work".

    Thanks much,

    Rion

    [0] :
    
https://stackoverflow.com/questions/51675597/how-to-unitest-gauge-metrics-in-flink
    
<https://stackoverflow.com/questions/51675597/how-to-unitest-gauge-metrics-in-flink>




Reply via email to