I'd recommend to do the aggregation over 5 seconds in graphite/prometheus etc., and expose a counter in Flink for each attribute/event_name.

User variables are a good choice for encoding the attribute/event_name values.

As for your remaining questions:

Flink does not support aggregating operator-level metrics across task executors. This job is left to proper time-series databases.

A counter can be reset like this: counter.dec(counter.getCount())
You can also create a custom implementation with whatever behavior you desire.

The default meter implementation (MeterView) calculate the rate of events per second based on counts that are periodically gathered over some time-period (usually 1 minute). If you want to calculate the rate-per-second over the last 5 seconds, then new Meterview(5) should do the trick. If you want to have a rate-per-5-seconds, then you will need to implement a custom meter. Note that I would generally discourage this as it will not work properly with some metric systems which assume rates to be per-second.

On 27/07/2020 19:59, Vijay Balakrishnan wrote:
Hi Al,
I am looking at the Custom User Metrics to count incoming records by an incomng attribute, event_name and aggregate it over 5 secs. I looked at https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#reporter.
I am trying to figure out which one to use Counter or Meter.
If using Counter, how do I reset it after 5 secs.
If using Meter which measures avg throughput, How do i specify a duration like 5 secs ? markEvent(long n) ???

I am also trying to collect total count of events across all TaskManagers.
Do I collect at flink_taskmanager_job_task_<customMetricName>_numrecordsIn or flink_taskmanager_job_task_operator_<customMetricName>_numrecordsIn ?? (so at task or operator level

Or should I use User variables like below:
|counter = getRuntimeContext() .getMetricGroup() .addGroup("MyMetricsKey", "MyMetricsValue") //specify my value for each custom event_name here- I might not know all custom event_names in advance .counter("myCounter");|

Pardon my confusion here.
TIA,

On Mon, Jul 27, 2020 at 10:00 AM Vijay Balakrishnan <bvija...@gmail.com <mailto:bvija...@gmail.com>> wrote:

    Hi David,
    Thanks for your reply.
    I am already using the PrometheusReporter. I am trying to figure
    out how to dig into the application data and count grouped by an
    attribute called event_name in the incoming application data and
    report to Grafana via Prometheus.

    I see the following at a high level
    task_numRecordsIn
    task_numRecordsOut
    ..operator_numLateRecordsDropped

    Trying to dig in deeper than this numRecordsIn to get groped by
    event_name attribute coming in the Input record every 5 secs.
    TIA,

    On Sat, Jul 25, 2020 at 10:55 AM David Anderson
    <da...@alpinegizmo.com <mailto:da...@alpinegizmo.com>> wrote:

        Setting up a Flink metrics dashboard in Grafana requires
        setting up and configuring one of Flink's metrics reporters
        [1] that is supported by Grafana as a data source. That means
        your options for a metrics reporter are Graphite, InfluxDB,
        Prometheus, or the Prometheus push reporter.

        If you want reporting every 5 seconds, with the push based
        reporters that's something you would configure in
        flink-conf.yaml, whereas with Prometheus you'll need to
        configure the scrape interval in the prometheus config file.
        For more on using Flink with Prometheus, see the blog post by
        Maximilian Bode [2].

        Best,
        David

        [1]
        
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#reporter
        [2]
        https://flink.apache.org/features/2019/03/11/prometheus-monitoring.html

        On Fri, Jul 24, 2020 at 12:57 AM Vijay Balakrishnan
        <bvija...@gmail.com <mailto:bvija...@gmail.com>> wrote:

            Hi,
            I am trying to figure out how many records came into the
            Flink App from KDS and how many records got moved to the
            next step or was dropped by the watermarks.

            I see on the Ui Table for *Source. Records Sent* with a
            total and the next step *Filter->FlatMap operator with a
            Records Received *total. How can I get these metric values
            for me to display In Grafana for eg. as I want to know a
            count for each 5 secs, how many records came in and how
            many were filtered out by the watermark or my Custom
            Filter operator etc  ?

            I looked at the breakdown of the Source__Custom_Source in
            Metrics as show in the attached pic. It has values like
            0.NumRecordsIn and 0.NumRecordsOut and so on from 0 to 9
            for the parallelism 10 I specified. It also has various
            breakdowns like 0.Timestamps/Watermarks.numRecordsIn and
            0.Timestamps/Watermarks.numRecordsOut

            Attached are some screenshots of the Flink DashBoard UI.

            TIA,


Reply via email to