Hello Vijay,
I have the same use case where I am reading from Kafka and want to
report count corresponding to each event every 5 mins. On Prometheus, I
want to set an alert if fr any event we do not receive the event like say
count is zero.

So can you please help me with how you implemented this finally?

On Fri, Jul 31, 2020 at 2:14 AM Chesnay Schepler <ches...@apache.org> wrote:

> If you do the aggregation in Prometheus I would think that you do not need
> to reset the counter; but it's been a while since I've used it.
> Flink will not automatically reset counters.
> If this is necessary then you will have to manually reset the counter
> every 5 seconds.
>
> The name under which it will be exposed to Prometheus depends on the
> configured scope format; see the metric documentation for details.
> By default it will contain information about the task executors, job, task
> etc. .
>
> On 30/07/2020 22:02, Vijay Balakrishnan wrote:
>
> Hi David,
> Thx for your reply.
>
> To summarize:
> Use a Counter:
>
> counter = getRuntimeContext()
>   .getMetricGroup()
>   .addGroup("MyMetricsKey", "MyMetricsValue") //specify my value for each 
> custom event_name here- I might not know all custom event_names in advance
>   .counter("myCounter");
>
> This MyMetricsValue will show up in Prometheus as for eg: 
> 0.Window(TumblingWindow...)__EventTimeTrigger_MyClass.myMetricsKey.myCounter 
> and so on for 1.
>
> Window(TumblingWindow...).. for each parallel Operator.
>
> This will then have to be aggregated in Prometheus for 5 secs for all the 
> <parallelismCount>.
>
> Window(TumblingWindow...)__EventTimeTrigger_MyClass.myMetricsKey.myCounter   
> // no task executors here - this is at Operator level ???
>
> This is independent of task Executors right ?? How does your statement - 
> Flink does not support aggregating operator-level metrics across task 
> executors. This job is left to proper time-series databases. relate to the 
> Summary above from me
>
> Also, I am assuming that the Counter will get reset after every Window 
> interval of 5 secs or do I need to do counter.dec(counter.getCount()) in the 
> close() method as you showed above.
>
> TIA,
>
>
>
> On Wed, Jul 29, 2020 at 2:53 AM Chesnay Schepler <ches...@apache.org>
> wrote:
>
>> I'd recommend to do the aggregation over 5 seconds in graphite/prometheus
>> etc., and expose a counter in Flink for each attribute/event_name.
>>
>> User variables are a good choice for encoding the attribute/event_name
>> values.
>>
>> As for your remaining questions:
>>
>> Flink does not support aggregating operator-level metrics across task
>> executors. This job is left to proper time-series databases.
>>
>> A counter can be reset like this: counter.dec(counter.getCount())
>> You can also create a custom implementation with whatever behavior you
>> desire.
>>
>> The default meter implementation (MeterView) calculate the rate of events
>> per second based on counts that are periodically gathered over some
>> time-period (usually 1 minute). If you want to calculate the
>> rate-per-second over the last 5 seconds, then new Meterview(5) should do
>> the trick.
>> If you want to have a rate-per-5-seconds, then you will need to implement
>> a custom meter. Note that I would generally discourage this as it will not
>> work properly with some metric systems which assume rates to be per-second.
>>
>> On 27/07/2020 19:59, Vijay Balakrishnan wrote:
>>
>> Hi Al,
>> I am looking at the Custom User Metrics to count incoming records by an
>> incomng attribute, event_name and aggregate it over 5 secs.
>> I looked at
>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#reporter
>> .
>> I am trying to figure out which one to use Counter or Meter.
>> If using Counter, how do I reset it after 5 secs.
>> If using Meter which measures avg throughput, How do i specify a
>> duration like 5 secs ? markEvent(long n) ???
>>
>> I am also trying to collect total count of events across all TaskManagers.
>> Do I collect at
>> flink_taskmanager_job_task_<customMetricName>_numrecordsIn  or
>> flink_taskmanager_job_task_operator_<customMetricName>_numrecordsIn  ??
>> (so at task or operator level
>>
>> Or should I use User variables like below:
>>
>> counter = getRuntimeContext()
>>   .getMetricGroup()
>>   .addGroup("MyMetricsKey", "MyMetricsValue") //specify my value for each 
>> custom event_name here- I might not know all custom event_names in advance
>>   .counter("myCounter");
>>
>>
>> Pardon my confusion here.
>> TIA,
>>
>> On Mon, Jul 27, 2020 at 10:00 AM Vijay Balakrishnan <bvija...@gmail.com>
>> wrote:
>>
>>> Hi David,
>>> Thanks for your reply.
>>> I am already using the PrometheusReporter. I am trying to figure out how
>>> to dig into the application data and count grouped by an attribute called
>>> event_name in the incoming application data and report to Grafana via
>>> Prometheus.
>>>
>>> I see the following at a high level
>>> task_numRecordsIn
>>> task_numRecordsOut
>>> ..operator_numLateRecordsDropped
>>>
>>> Trying to dig in deeper than this numRecordsIn to get groped by
>>> event_name attribute coming in the Input record every 5 secs.
>>> TIA,
>>>
>>> On Sat, Jul 25, 2020 at 10:55 AM David Anderson <da...@alpinegizmo.com>
>>> wrote:
>>>
>>>> Setting up a Flink metrics dashboard in Grafana requires setting up and
>>>> configuring one of Flink's metrics reporters [1] that is supported by
>>>> Grafana as a data source. That means your options for a metrics reporter
>>>> are Graphite, InfluxDB, Prometheus, or the Prometheus push reporter.
>>>>
>>>> If you want reporting every 5 seconds, with the push based reporters
>>>> that's something you would configure in flink-conf.yaml, whereas with
>>>> Prometheus you'll need to configure the scrape interval in the prometheus
>>>> config file. For more on using Flink with Prometheus, see the blog post by
>>>> Maximilian Bode [2].
>>>>
>>>> Best,
>>>> David
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#reporter
>>>> [2]
>>>> https://flink.apache.org/features/2019/03/11/prometheus-monitoring.html
>>>>
>>>> On Fri, Jul 24, 2020 at 12:57 AM Vijay Balakrishnan <bvija...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> I am trying to figure out how many records came into the Flink App
>>>>> from KDS and how many records got moved to the next step or was dropped by
>>>>> the watermarks.
>>>>>
>>>>> I see on the Ui Table for *Source. Records Sent* with a total and the
>>>>> next step *Filter->FlatMap operator with a Records Received *total.
>>>>> How can I get these metric values for me to display In Grafana for eg. as 
>>>>> I
>>>>> want to know a count for each 5 secs, how many records came in and how 
>>>>> many
>>>>> were filtered out by the watermark or my Custom Filter operator etc  ?
>>>>>
>>>>> I looked at the breakdown of the Source__Custom_Source in Metrics as
>>>>> show in the attached pic. It has values like 0.NumRecordsIn and
>>>>> 0.NumRecordsOut and so on from 0 to 9 for the parallelism 10 I specified.
>>>>> It also has various breakdowns like 0.Timestamps/Watermarks.numRecordsIn
>>>>> and 0.Timestamps/Watermarks.numRecordsOut
>>>>>
>>>>> Attached are some screenshots of the Flink DashBoard UI.
>>>>>
>>>>> TIA,
>>>>>
>>>>>
>>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Reply via email to