If you do the aggregation in Prometheus I would think that you do not
need to reset the counter; but it's been a while since I've used it.
Flink will not automatically reset counters.
If this is necessary then you will have to manually reset the counter
every 5 seconds.
The name under which it will be exposed to Prometheus depends on the
configured scope format; see the metric documentation for details.
By default it will contain information about the task executors, job,
task etc. .
On 30/07/2020 22:02, Vijay Balakrishnan wrote:
Hi David,
Thx for your reply.
To summarize:
Use a Counter:
|counter = getRuntimeContext() .getMetricGroup()
.addGroup("MyMetricsKey", "MyMetricsValue") //specify my value for
each custom event_name here- I might not know all custom event_names
in advance .counter("myCounter");|
|This MyMetricsValue will show up in Prometheus as for eg:
0.Window(TumblingWindow...)__EventTimeTrigger_MyClass.myMetricsKey.myCounter
and so on for 1. Window(TumblingWindow...).. for each parallel Operator. |
|This will then have to be aggregated in Prometheus for 5 secs for all
the <parallelismCount>.
Window(TumblingWindow...)__EventTimeTrigger_MyClass.myMetricsKey.myCounter
// no task executors here - this is at Operator level ???|
|This is independent of task Executors right ?? How does your statement -
|Flink does not support aggregating operator-level metrics across task
executors. This job is left to proper time-series databases. relate to the
Summary above from me
Also, I am assuming that the Counter will get reset after every Window interval
of 5 secs or do I need to do counter.dec(counter.getCount()) in the close()
method as you showed above.
TIA,
||
On Wed, Jul 29, 2020 at 2:53 AM Chesnay Schepler <ches...@apache.org
<mailto:ches...@apache.org>> wrote:
I'd recommend to do the aggregation over 5 seconds in
graphite/prometheus etc., and expose a counter in Flink for each
attribute/event_name.
User variables are a good choice for encoding the
attribute/event_name values.
As for your remaining questions:
Flink does not support aggregating operator-level metrics across
task executors. This job is left to proper time-series databases.
A counter can be reset like this: counter.dec(counter.getCount())
You can also create a custom implementation with whatever behavior
you desire.
The default meter implementation (MeterView) calculate the rate of
events per second based on counts that are periodically gathered
over some time-period (usually 1 minute). If you want to calculate
the rate-per-second over the last 5 seconds, then new Meterview(5)
should do the trick.
If you want to have a rate-per-5-seconds, then you will need to
implement a custom meter. Note that I would generally discourage
this as it will not work properly with some metric systems which
assume rates to be per-second.
On 27/07/2020 19:59, Vijay Balakrishnan wrote:
Hi Al,
I am looking at the Custom User Metrics to count incoming records
by an incomng attribute, event_name and aggregate it over 5 secs.
I looked at
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#reporter.
I am trying to figure out which one to use Counter or Meter.
If using Counter, how do I reset it after 5 secs.
If using Meter which measures avg throughput, How do i specify a
duration like 5 secs ? markEvent(long n) ???
I am also trying to collect total count of events across all
TaskManagers.
Do I collect at
flink_taskmanager_job_task_<customMetricName>_numrecordsIn or
flink_taskmanager_job_task_operator_<customMetricName>_numrecordsIn
?? (so at task or operator level
Or should I use User variables like below:
|counter = getRuntimeContext() .getMetricGroup()
.addGroup("MyMetricsKey", "MyMetricsValue") //specify my value
for each custom event_name here- I might not know all custom
event_names in advance .counter("myCounter");|
Pardon my confusion here.
TIA,
On Mon, Jul 27, 2020 at 10:00 AM Vijay Balakrishnan
<bvija...@gmail.com <mailto:bvija...@gmail.com>> wrote:
Hi David,
Thanks for your reply.
I am already using the PrometheusReporter. I am trying to
figure out how to dig into the application data and count
grouped by an attribute called event_name in the incoming
application data and report to Grafana via Prometheus.
I see the following at a high level
task_numRecordsIn
task_numRecordsOut
..operator_numLateRecordsDropped
Trying to dig in deeper than this numRecordsIn to get groped
by event_name attribute coming in the Input record every 5 secs.
TIA,
On Sat, Jul 25, 2020 at 10:55 AM David Anderson
<da...@alpinegizmo.com <mailto:da...@alpinegizmo.com>> wrote:
Setting up a Flink metrics dashboard in Grafana requires
setting up and configuring one of Flink's metrics
reporters [1] that is supported by Grafana as a data
source. That means your options for a metrics reporter
are Graphite, InfluxDB, Prometheus, or the Prometheus
push reporter.
If you want reporting every 5 seconds, with the push
based reporters that's something you would configure in
flink-conf.yaml, whereas with Prometheus you'll need to
configure the scrape interval in the prometheus config
file. For more on using Flink with Prometheus, see the
blog post by Maximilian Bode [2].
Best,
David
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#reporter
[2]
https://flink.apache.org/features/2019/03/11/prometheus-monitoring.html
On Fri, Jul 24, 2020 at 12:57 AM Vijay Balakrishnan
<bvija...@gmail.com <mailto:bvija...@gmail.com>> wrote:
Hi,
I am trying to figure out how many records came into
the Flink App from KDS and how many records got moved
to the next step or was dropped by the watermarks.
I see on the Ui Table for *Source. Records Sent* with
a total and the next step *Filter->FlatMap operator
with a Records Received *total. How can I get these
metric values for me to display In Grafana for eg. as
I want to know a count for each 5 secs, how many
records came in and how many were filtered out by the
watermark or my Custom Filter operator etc ?
I looked at the breakdown of the
Source__Custom_Source in Metrics as show in the
attached pic. It has values like 0.NumRecordsIn and
0.NumRecordsOut and so on from 0 to 9 for the
parallelism 10 I specified. It also has various
breakdowns like 0.Timestamps/Watermarks.numRecordsIn
and 0.Timestamps/Watermarks.numRecordsOut
Attached are some screenshots of the Flink DashBoard UI.
TIA,