I'd recommend to do the aggregation over 5 seconds in
graphite/prometheus etc., and expose a counter in Flink for each
attribute/event_name.
User variables are a good choice for encoding the attribute/event_name
values.
As for your remaining questions:
Flink does not support aggregating operator-level metrics across task
executors. This job is left to proper time-series databases.
A counter can be reset like this: counter.dec(counter.getCount())
You can also create a custom implementation with whatever behavior you
desire.
The default meter implementation (MeterView) calculate the rate of
events per second based on counts that are periodically gathered over
some time-period (usually 1 minute). If you want to calculate the
rate-per-second over the last 5 seconds, then new Meterview(5) should do
the trick.
If you want to have a rate-per-5-seconds, then you will need to
implement a custom meter. Note that I would generally discourage this as
it will not work properly with some metric systems which assume rates to
be per-second.
On 27/07/2020 19:59, Vijay Balakrishnan wrote:
Hi Al,
I am looking at the Custom User Metrics to count incoming records by
an incomng attribute, event_name and aggregate it over 5 secs.
I looked at
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#reporter.
I am trying to figure out which one to use Counter or Meter.
If using Counter, how do I reset it after 5 secs.
If using Meter which measures avg throughput, How do i specify a
duration like 5 secs ? markEvent(long n) ???
I am also trying to collect total count of events across all TaskManagers.
Do I collect at
flink_taskmanager_job_task_<customMetricName>_numrecordsIn or
flink_taskmanager_job_task_operator_<customMetricName>_numrecordsIn ??
(so at task or operator level
Or should I use User variables like below:
|counter = getRuntimeContext() .getMetricGroup()
.addGroup("MyMetricsKey", "MyMetricsValue") //specify my value for
each custom event_name here- I might not know all custom event_names
in advance .counter("myCounter");|
Pardon my confusion here.
TIA,
On Mon, Jul 27, 2020 at 10:00 AM Vijay Balakrishnan
<bvija...@gmail.com <mailto:bvija...@gmail.com>> wrote:
Hi David,
Thanks for your reply.
I am already using the PrometheusReporter. I am trying to figure
out how to dig into the application data and count grouped by an
attribute called event_name in the incoming application data and
report to Grafana via Prometheus.
I see the following at a high level
task_numRecordsIn
task_numRecordsOut
..operator_numLateRecordsDropped
Trying to dig in deeper than this numRecordsIn to get groped by
event_name attribute coming in the Input record every 5 secs.
TIA,
On Sat, Jul 25, 2020 at 10:55 AM David Anderson
<da...@alpinegizmo.com <mailto:da...@alpinegizmo.com>> wrote:
Setting up a Flink metrics dashboard in Grafana requires
setting up and configuring one of Flink's metrics reporters
[1] that is supported by Grafana as a data source. That means
your options for a metrics reporter are Graphite, InfluxDB,
Prometheus, or the Prometheus push reporter.
If you want reporting every 5 seconds, with the push based
reporters that's something you would configure in
flink-conf.yaml, whereas with Prometheus you'll need to
configure the scrape interval in the prometheus config file.
For more on using Flink with Prometheus, see the blog post by
Maximilian Bode [2].
Best,
David
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#reporter
[2]
https://flink.apache.org/features/2019/03/11/prometheus-monitoring.html
On Fri, Jul 24, 2020 at 12:57 AM Vijay Balakrishnan
<bvija...@gmail.com <mailto:bvija...@gmail.com>> wrote:
Hi,
I am trying to figure out how many records came into the
Flink App from KDS and how many records got moved to the
next step or was dropped by the watermarks.
I see on the Ui Table for *Source. Records Sent* with a
total and the next step *Filter->FlatMap operator with a
Records Received *total. How can I get these metric values
for me to display In Grafana for eg. as I want to know a
count for each 5 secs, how many records came in and how
many were filtered out by the watermark or my Custom
Filter operator etc ?
I looked at the breakdown of the Source__Custom_Source in
Metrics as show in the attached pic. It has values like
0.NumRecordsIn and 0.NumRecordsOut and so on from 0 to 9
for the parallelism 10 I specified. It also has various
breakdowns like 0.Timestamps/Watermarks.numRecordsIn and
0.Timestamps/Watermarks.numRecordsOut
Attached are some screenshots of the Flink DashBoard UI.
TIA,