Actually you'd have to further subclass the operatorMetricGroup such that addGroup works as expected.
This is admittedly a bit of a drag :/

On 3/16/2021 4:35 PM, Chesnay Schepler wrote:
The test harness is fully independent of the MiniClusterResource; it isn't actually running a job. That's why your metrics never arrive at the reporter.

You can either:
a) use the test harness with a custom MetricGroup implementation that intercepts registered metrics, set in the MockEnvironment b) use the function as part of a job with the custom reporter approach. (essentially, fromElements -> function -> discarding sink)

The following would work for a), but it must be noted that this relies on quite a few things that are internal to Flink:

...
InterceptingOperatorMetricGroup operatorMetricGroup =
         new InterceptingOperatorMetricGroup(); InterceptingTaskMetricGroup 
taskMetricGroup =
         new InterceptingTaskMetricGroup() {
             @Override public OperatorMetricGroupgetOrAddOperator(OperatorID 
id, String name) {
                 return operatorMetricGroup; }
         };
new MockEnvironmentBuilder()
     .setMetricGroup(taskMetricGroup)

...

On 3/16/2021 3:42 PM, Rion Williams wrote:
In this case, I was using a harness to test the function. Although, I could honestly care less about the unit-test surrounding metrics, I'm much more concerned with having something that will actually run and work as intended within a job. The only real concern I have or problem that I want to solve is building metrics that may vary based on the data coming in from a "label" perspective (e.g. keeping track of the events I've seen for a given tenant, or some other properties).

Something like:

<metric prefix>_events_seen { tenant = "tenant-1" } 1.0
<metric prefix>_events_seen { tenant = "tenant-2" } 200.0

If that makes sense. I've used the Prometheus client previously to accomplish these types of metrics, but since I'm fairly new to the Flink world, I was trying to use the built-in constructs available (thus the dynamic groups / metrics being added).

On Tue, Mar 16, 2021 at 9:36 AM Chesnay Schepler <ches...@apache.org <mailto:ches...@apache.org>> wrote:

    Are you actually running a job, or are you using a harness for
    testing your function?

    On 3/16/2021 3:24 PM, Rion Williams wrote:
    Hi Chesnay,

    Thanks for the prompt response and feedback, it's very much
    appreciated. Please see the inline responses below to your
    questions:

        *Was there anything in the logs (ideally on debug)?*


    I didn't see anything within the logs that seemed to indicate
    anything out of the ordinary. I'm currently using a
    MiniClusterResources for this and attempted to set the logging
    levels to pick up everything (i.e. ALL), but if there's a way to
    expose more, I'm not aware of it.

        *Have you debugged the execution and followed the counter()
        calls all the way to the reporter?*


    With the debugger, I traced one of the counter initializations
    and it seems that no reporters were being found within the
    register call in the MetricsRegistryImpl (i.e. this.reporters
    has no registered reporters):
    if (this.reporters !=null) {
         for(int i =0; i <this.reporters.size(); ++i) {
             MetricRegistryImpl.ReporterAndSettings reporterAndSettings = 
(MetricRegistryImpl.ReporterAndSettings)this.reporters.get(i);

             try {
                 if (reporterAndSettings !=null) {
                     FrontMetricGroup front =new 
FrontMetricGroup(reporterAndSettings.getSettings(),group);
                     
reporterAndSettings.getReporter().notifyOfAddedMetric(metric,metricName, front);
                 }
             }catch (Exception var11) {
                 LOG.warn("Error while registering metric: 
{}.",metricName,var11);
             }
         }
    }
     Perhaps this is an error on my part as I had assumed the
    following would be sufficient to register my reporter (within a
    local / minicluster environment):
    private val metricsConfiguration =Configuration.fromMap(mutableMapOf(
         ConfigConstants.METRICS_REPORTER_PREFIX +
         "MockCustomMetricsReporter." +
         ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to 
MockCustomMetricsReporter::class.java.name ))

    @ClassRule @JvmField val flink =MiniClusterResource(
         MiniClusterResourceConfiguration.Builder()
             .setConfiguration(metricsConfiguration)
             .setNumberTaskManagers(1)
             .setNumberSlotsPerTaskManager(1)
             .build()
    )
    However, it's clearly being recognized for the built-in metrics,
    just not these custom ones that are being registered as they are
    triggering the notifyOfAddedMetric() function within the
    reporter itself.

        *Do you only see JobManager metrics, or is there somewhere
        also something about the TaskManager?*


    It looks like there are metrics coming from both the JobManager
    and TaskManagers from the following examples that were coming out:
    localhost.jobmanager.numRegisteredTaskManagers
    
.taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.Shuffle.Netty.UsedMemorySegments
    
.taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.JVM.Memory.Metaspace.Committed
    localhost.jobmanager.Status.JVM.Memory.Direct.Count
    I do agree that a factory implementation with a static reporter
    would likely be a better approach, so I may explore that a bit
    more. As well as adding some changes to the existing, albeit
    ghetto, implementation for handling the dynamic metrics. I did
    see several references to a MetricRegistry class, however I
    wasn't sure if that was the most appropriate place to add this
    type of functionality or if it was needed at all.

    Thanks much,

    Rion



    On Tue, Mar 16, 2021 at 4:45 AM Chesnay Schepler
    <ches...@apache.org <mailto:ches...@apache.org>> wrote:

        Was there anything in the logs (ideally on debug)?
        Have you debugged the execution and followed the counter()
        calls all the way to the reporter?
        Do you only see JobManager metrics, or is there somewhere
        also something about the TaskManager?

        I can see several issues with your code, but none that would
        fully explain the issue:

        a) your reporter is not thread-safe
        b) you only differentiate metrics by name, which will lead
        to quite a few collisions.

        Be also aware that there will be 2 reporter instances; one
        for the JM and one for the TM.
        To remedy this, I would recommend creating a factory that
        returns a static reporter instance instead; overall this
        tends to be cleaner.

        Alternatively, when using the testing harnesses IIRC you can
        also set set a custom MetricGroup implementation.

        On 3/16/2021 4:13 AM, Rion Williams wrote:
        Hi all,

        Recently, I was working on adding some custom metrics to a
        Flink job that required the use of dynamic labels (i.e.
        capturing various counters that were "slicable" by things
        like tenant / source, etc.).

        I ended up handling it in a very naive fashion that would
        just keep a dictionary of metrics that had already been
        registered and update them accordingly which looked
        something like this:
        class MyCustomProcessFunction:ProcessFunction<Event,Unit>() {
             private lateinit var metrics:CustomMetricsRegistry override fun 
open(parameters:Configuration) {
                 metrics =CustomMetricsRegistry(runtimeContext.metricGroup)
             }

             override fun 
processElement(event:Event,context:Context,collector:Collector<Unit>) {
                 // Insert calls like metrics.inc("tenant-name", 4) here }
        }

        class CustomMetricsRegistry(private val 
metricGroup:MetricGroup):Serializable {
             // Increments a given metric by key fun 
inc(metric:String,tenant:String,amount:Long =1) {
                 // Store a key for the metric val key ="$metric-$tenant" // 
Store/register the metric if (!registeredMetrics.containsKey(key)){
                     registeredMetrics[key] =metricGroup 
.addGroup("tenant",tenant)
                         .counter(metric)
                 }

                 // Update the metric by a given amount 
registeredMetrics[key]!!.inc(amount)
             }

             companion object {
                 private var registeredMetrics:HashMap<String,Counter> = 
hashMapOf()
             }
        }
        Basically registering and updating new metrics for tenants
        as they are encountered, which I've seen being emitted as
        expected via hitting the appropriately configured metrics
        endpoint (using a PrometheusReporter).

        However, while I was trying to write a few unit tests for
        this, I seemed to encounter an issue. I was following a
        Stack Overflow post that was answered by @Chesnay Schepler
        <mailto:ches...@apache.org> [0] that described the use of
        an in-memory/embedded Flink cluster and a custom reporter
        that would statically expose the underlying metrics.

        So I took a shot at implementing something similar as follows:

        *Flink Cluster Definition*
        private val metricsConfiguration =Configuration.fromMap(mutableMapOf(
             ConfigConstants.METRICS_REPORTER_PREFIX +
             "MockCustomMetricsReporter." +
             ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to 
MockCustomMetricsReporter::class.java.name ))

        @ClassRule @JvmField val flinkCluster =MiniClusterResource(
             MiniClusterResourceConfiguration.Builder()
                 .setConfiguration(metricsConfiguration)
                 .setNumberTaskManagers(1)
                 .setNumberSlotsPerTaskManager(1)
                 .build()
        )
        *Custom Reporter*
        class MockCustomMetricsReporter :MetricReporter {

             override fun open(metricConfig:MetricConfig) {}
             override fun close() {}
             override fun 
notifyOfAddedMetric(metric:Metric,name:String,metricGroup:MetricGroup) {
                 // Store the metrics that are being registered as we see
        them if (!registeredCustomMetrics.containsKey(name)){
                     registeredCustomMetrics[name] =metric }
             }

             override fun 
notifyOfRemovedMetric(metric:Metric,name:String,metricGroup:MetricGroup) {
                 // Do nothing here }

             companion object {
                 // Static reference to metrics as they are registered var
        registeredCustomMetrics =HashMap<String,Metric>()
             }
        }
        *Example Test*
        @Test fun `Example Metrics Use Case`(){
             // Arrange val stream 
=StreamExecutionEnvironment.getExecutionEnvironment()
             val events =listOf(
                 eventWithUsers("tenant1","us...@testing.com 
<mailto:us...@testing.com>"),
                 eventWithUsers("tenant2","us...@testing.com 
<mailto:us...@testing.com>"),
             )

             // Act stream
                 .fromCollection(events)
                 .process(MyCustomProcessFunction())

             // Assert stream.execute()
             assertTrue(MockCustomMetricsReporter.registeredCustomMetrics.size 
> 0)
        }
        While this test will pass, *the problem is that the custom
        metrics defined dynamically (via the CustomMetricsRegistry
        implementation) do not appear within the
        registeredCustomMetrics collection*. In fact, there are 21
        metrics that get registered but all of them appear to be
        classic out-of-the-box metrics such as CPU usage, number of
        task managers, load, various other Netty and JVM stats, but
        no custom metrics are included.

        I've tried multiple different configurations,
        implementations via a custom TestHarness, etc. but for some
        reason the custom metrics being defined are never
        triggering the notifyOfAddedMetric function which would be
        responsible for adding them to the static collection to be
        asserted against.

        Any ideas / guidance would be more than welcome. Perhaps a
        different approach? Based off examples I've encountered,
        the code seems like it should "just work".

        Thanks much,

        Rion

        [0] :
        
https://stackoverflow.com/questions/51675597/how-to-unitest-gauge-metrics-in-flink
        
<https://stackoverflow.com/questions/51675597/how-to-unitest-gauge-metrics-in-flink>






Reply via email to