If you do the aggregation in Prometheus I would think that you do not need to reset the counter; but it's been a while since I've used it.
Flink will not automatically reset counters.
If this is necessary then you will have to manually reset the counter every 5 seconds.

The name under which it will be exposed to Prometheus depends on the configured scope format; see the metric documentation for details. By default it will contain information about the task executors, job, task etc. .

On 30/07/2020 22:02, Vijay Balakrishnan wrote:
Hi David,
Thx for your reply.

To summarize:
Use a Counter:
|counter = getRuntimeContext() .getMetricGroup() .addGroup("MyMetricsKey", "MyMetricsValue") //specify my value for each custom event_name here- I might not know all custom event_names in advance .counter("myCounter");| |This MyMetricsValue will show up in Prometheus as for eg: 0.Window(TumblingWindow...)__EventTimeTrigger_MyClass.myMetricsKey.myCounter and so on for 1. Window(TumblingWindow...).. for each parallel Operator. | |This will then have to be aggregated in Prometheus for 5 secs for all the <parallelismCount>. Window(TumblingWindow...)__EventTimeTrigger_MyClass.myMetricsKey.myCounter // no task executors here - this is at Operator level ???|
|This is independent of task Executors right ?? How does your statement - 
|Flink does not support aggregating operator-level metrics across task 
executors. This job is left to proper time-series databases. relate to the 
Summary above from me
Also, I am assuming that the Counter will get reset after every Window interval 
of 5 secs or do I need to do counter.dec(counter.getCount()) in the close() 
method as you showed above.
TIA,
||

On Wed, Jul 29, 2020 at 2:53 AM Chesnay Schepler <ches...@apache.org <mailto:ches...@apache.org>> wrote:

    I'd recommend to do the aggregation over 5 seconds in
    graphite/prometheus etc., and expose a counter in Flink for each
    attribute/event_name.

    User variables are a good choice for encoding the
    attribute/event_name values.

    As for your remaining questions:

    Flink does not support aggregating operator-level metrics across
    task executors. This job is left to proper time-series databases.

    A counter can be reset like this: counter.dec(counter.getCount())
    You can also create a custom implementation with whatever behavior
    you desire.

    The default meter implementation (MeterView) calculate the rate of
    events per second based on counts that are periodically gathered
    over some time-period (usually 1 minute). If you want to calculate
    the rate-per-second over the last 5 seconds, then new Meterview(5)
    should do the trick.
    If you want to have a rate-per-5-seconds, then you will need to
    implement a custom meter. Note that I would generally discourage
    this as it will not work properly with some metric systems which
    assume rates to be per-second.

    On 27/07/2020 19:59, Vijay Balakrishnan wrote:
    Hi Al,
    I am looking at the Custom User Metrics to count incoming records
    by an incomng attribute, event_name and aggregate it over 5 secs.
    I looked at
    
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#reporter.
    I am trying to figure out which one to use Counter or Meter.
    If using Counter, how do I reset it after 5 secs.
    If using Meter which measures avg throughput, How do i specify a
    duration like 5 secs ? markEvent(long n) ???

    I am also trying to collect total count of events across all
    TaskManagers.
    Do I collect at
    flink_taskmanager_job_task_<customMetricName>_numrecordsIn or
    flink_taskmanager_job_task_operator_<customMetricName>_numrecordsIn
    ?? (so at task or operator level

    Or should I use User variables like below:
    |counter = getRuntimeContext() .getMetricGroup()
    .addGroup("MyMetricsKey", "MyMetricsValue") //specify my value
    for each custom event_name here- I might not know all custom
    event_names in advance .counter("myCounter");|

    Pardon my confusion here.
    TIA,

    On Mon, Jul 27, 2020 at 10:00 AM Vijay Balakrishnan
    <bvija...@gmail.com <mailto:bvija...@gmail.com>> wrote:

        Hi David,
        Thanks for your reply.
        I am already using the PrometheusReporter. I am trying to
        figure out how to dig into the application data and count
        grouped by an attribute called event_name in the incoming
        application data and report to Grafana via Prometheus.

        I see the following at a high level
        task_numRecordsIn
        task_numRecordsOut
        ..operator_numLateRecordsDropped

        Trying to dig in deeper than this numRecordsIn to get groped
        by event_name attribute coming in the Input record every 5 secs.
        TIA,

        On Sat, Jul 25, 2020 at 10:55 AM David Anderson
        <da...@alpinegizmo.com <mailto:da...@alpinegizmo.com>> wrote:

            Setting up a Flink metrics dashboard in Grafana requires
            setting up and configuring one of Flink's metrics
            reporters [1] that is supported by Grafana as a data
            source. That means your options for a metrics reporter
            are Graphite, InfluxDB, Prometheus, or the Prometheus
            push reporter.

            If you want reporting every 5 seconds, with the push
            based reporters that's something you would configure in
            flink-conf.yaml, whereas with Prometheus you'll need to
            configure the scrape interval in the prometheus config
            file. For more on using Flink with Prometheus, see the
            blog post by Maximilian Bode [2].

            Best,
            David

            [1]
            
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#reporter
            [2]
            
https://flink.apache.org/features/2019/03/11/prometheus-monitoring.html

            On Fri, Jul 24, 2020 at 12:57 AM Vijay Balakrishnan
            <bvija...@gmail.com <mailto:bvija...@gmail.com>> wrote:

                Hi,
                I am trying to figure out how many records came into
                the Flink App from KDS and how many records got moved
                to the next step or was dropped by the watermarks.

                I see on the Ui Table for *Source. Records Sent* with
                a total and the next step *Filter->FlatMap operator
                with a Records Received *total. How can I get these
                metric values for me to display In Grafana for eg. as
                I want to know a count for each 5 secs, how many
                records came in and how many were filtered out by the
                watermark or my Custom Filter operator etc  ?

                I looked at the breakdown of the
                Source__Custom_Source in Metrics as show in the
                attached pic. It has values like 0.NumRecordsIn and
                0.NumRecordsOut and so on from 0 to 9 for the
                parallelism 10 I specified. It also has various
                breakdowns like 0.Timestamps/Watermarks.numRecordsIn
                and 0.Timestamps/Watermarks.numRecordsOut

                Attached are some screenshots of the Flink DashBoard UI.

                TIA,



Reply via email to