Hi David, Thx for your reply. To summarize: Use a Counter:
counter = getRuntimeContext() .getMetricGroup() .addGroup("MyMetricsKey", "MyMetricsValue") //specify my value for each custom event_name here- I might not know all custom event_names in advance .counter("myCounter"); This MyMetricsValue will show up in Prometheus as for eg: 0.Window(TumblingWindow...)__EventTimeTrigger_MyClass.myMetricsKey.myCounter and so on for 1. Window(TumblingWindow...).. for each parallel Operator. This will then have to be aggregated in Prometheus for 5 secs for all the <parallelismCount>. Window(TumblingWindow...)__EventTimeTrigger_MyClass.myMetricsKey.myCounter // no task executors here - this is at Operator level ??? This is independent of task Executors right ?? How does your statement - Flink does not support aggregating operator-level metrics across task executors. This job is left to proper time-series databases. relate to the Summary above from me Also, I am assuming that the Counter will get reset after every Window interval of 5 secs or do I need to do counter.dec(counter.getCount()) in the close() method as you showed above. TIA, On Wed, Jul 29, 2020 at 2:53 AM Chesnay Schepler <ches...@apache.org> wrote: > I'd recommend to do the aggregation over 5 seconds in graphite/prometheus > etc., and expose a counter in Flink for each attribute/event_name. > > User variables are a good choice for encoding the attribute/event_name > values. > > As for your remaining questions: > > Flink does not support aggregating operator-level metrics across task > executors. This job is left to proper time-series databases. > > A counter can be reset like this: counter.dec(counter.getCount()) > You can also create a custom implementation with whatever behavior you > desire. > > The default meter implementation (MeterView) calculate the rate of events > per second based on counts that are periodically gathered over some > time-period (usually 1 minute). If you want to calculate the > rate-per-second over the last 5 seconds, then new Meterview(5) should do > the trick. > If you want to have a rate-per-5-seconds, then you will need to implement > a custom meter. Note that I would generally discourage this as it will not > work properly with some metric systems which assume rates to be per-second. > > On 27/07/2020 19:59, Vijay Balakrishnan wrote: > > Hi Al, > I am looking at the Custom User Metrics to count incoming records by an > incomng attribute, event_name and aggregate it over 5 secs. > I looked at > https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#reporter > . > I am trying to figure out which one to use Counter or Meter. > If using Counter, how do I reset it after 5 secs. > If using Meter which measures avg throughput, How do i specify a > duration like 5 secs ? markEvent(long n) ??? > > I am also trying to collect total count of events across all TaskManagers. > Do I collect at > flink_taskmanager_job_task_<customMetricName>_numrecordsIn or > flink_taskmanager_job_task_operator_<customMetricName>_numrecordsIn ?? > (so at task or operator level > > Or should I use User variables like below: > > counter = getRuntimeContext() > .getMetricGroup() > .addGroup("MyMetricsKey", "MyMetricsValue") //specify my value for each > custom event_name here- I might not know all custom event_names in advance > .counter("myCounter"); > > > Pardon my confusion here. > TIA, > > On Mon, Jul 27, 2020 at 10:00 AM Vijay Balakrishnan <bvija...@gmail.com> > wrote: > >> Hi David, >> Thanks for your reply. >> I am already using the PrometheusReporter. I am trying to figure out how >> to dig into the application data and count grouped by an attribute called >> event_name in the incoming application data and report to Grafana via >> Prometheus. >> >> I see the following at a high level >> task_numRecordsIn >> task_numRecordsOut >> ..operator_numLateRecordsDropped >> >> Trying to dig in deeper than this numRecordsIn to get groped by >> event_name attribute coming in the Input record every 5 secs. >> TIA, >> >> On Sat, Jul 25, 2020 at 10:55 AM David Anderson <da...@alpinegizmo.com> >> wrote: >> >>> Setting up a Flink metrics dashboard in Grafana requires setting up and >>> configuring one of Flink's metrics reporters [1] that is supported by >>> Grafana as a data source. That means your options for a metrics reporter >>> are Graphite, InfluxDB, Prometheus, or the Prometheus push reporter. >>> >>> If you want reporting every 5 seconds, with the push based reporters >>> that's something you would configure in flink-conf.yaml, whereas with >>> Prometheus you'll need to configure the scrape interval in the prometheus >>> config file. For more on using Flink with Prometheus, see the blog post by >>> Maximilian Bode [2]. >>> >>> Best, >>> David >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#reporter >>> [2] >>> https://flink.apache.org/features/2019/03/11/prometheus-monitoring.html >>> >>> On Fri, Jul 24, 2020 at 12:57 AM Vijay Balakrishnan <bvija...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> I am trying to figure out how many records came into the Flink App from >>>> KDS and how many records got moved to the next step or was dropped by the >>>> watermarks. >>>> >>>> I see on the Ui Table for *Source. Records Sent* with a total and the >>>> next step *Filter->FlatMap operator with a Records Received *total. >>>> How can I get these metric values for me to display In Grafana for eg. as I >>>> want to know a count for each 5 secs, how many records came in and how many >>>> were filtered out by the watermark or my Custom Filter operator etc ? >>>> >>>> I looked at the breakdown of the Source__Custom_Source in Metrics as >>>> show in the attached pic. It has values like 0.NumRecordsIn and >>>> 0.NumRecordsOut and so on from 0 to 9 for the parallelism 10 I specified. >>>> It also has various breakdowns like 0.Timestamps/Watermarks.numRecordsIn >>>> and 0.Timestamps/Watermarks.numRecordsOut >>>> >>>> Attached are some screenshots of the Flink DashBoard UI. >>>> >>>> TIA, >>>> >>>> >