Ah ok, the onTimer() and processElement() methods are all protected by synchronized blocks on the same lock. So that shouldn’t be a problem.
> On 22. May 2017, at 15:08, Chesnay Schepler <ches...@apache.org> wrote: > > Yes, that could cause the observed issue. > > The default implementations are not thread-safe; if you do concurrent writes > they may be lost/overwritten. > You will have to either guard accesses to that metric with a synchronized > block or implement your own thread-safe counter. > > On 22.05.2017 14:17, Aljoscha Krettek wrote: >> @Chesnay With timers it will happen that onTimer() is called from a >> different Thread than the Tread that is calling processElement(). If Metrics >> updates happen in both, would that be a problem? >> >>> On 19. May 2017, at 11:57, Chesnay Schepler <ches...@apache.org> wrote: >>> >>> 2. isn't quite accurate actually; metrics on the TaskManager are not >>> persisted across restarts. >>> >>> On 19.05.2017 11:21, Chesnay Schepler wrote: >>>> 1. This shouldn't happen. Do you access the counter from different threads? >>>> >>>> 2. Metrics in general are not persisted across restarts, and there is no >>>> way to configure flink to do so at the moment. >>>> >>>> 3. Counters are sent as gauges since as far as I know StatsD counters are >>>> not allowed to be decremented. >>>> >>>> On 19.05.2017 08:56, jaxbihani wrote: >>>>> Background: We are using a job using ProcessFunction which reads data from >>>>> kafka fires ~5-10K timers per second and sends matched events to >>>>> KafkaSink. >>>>> We are collecting metrics for collecting no of active timers, no of timers >>>>> scheduled etc. We use statsd reporter and monitor using Grafana dashboard >>>>> & >>>>> RocksDBStateBackend backed by HDFS as state. >>>>> >>>>> Observations/Problems: >>>>> 1. *Counter value suddenly got reset:* While job was running fine, on one >>>>> fine moment, metric of a monotonically increasing counter (Counter where >>>>> we >>>>> just used inc() operation) suddenly became 0 and then resumed from there >>>>> onwards. Only exception in the logs were related to transient connectivity >>>>> issues to datanodes. Also there was no other indicator of any failure >>>>> observed after inspecting system metrics/checkpoint metrics. It happened >>>>> just once across multiple runs of a same job. >>>>> 2. *Counters not retained during flink restart with savepoint*: Cancelled >>>>> job with -s option taking savepoint and then restarted the job using the >>>>> savepoint. After restart metrics started from 0. I was expecting metric >>>>> value of a given operator would also be part of state. >>>>> 3. *Counter metrics getting sent as Gauge*: Using tcpdump I was inspecting >>>>> the format in which metric are sent to statsd. I observed that even the >>>>> metric which in my code were counters, were sent as gauges. I didn't get >>>>> why >>>>> that was so. >>>>> >>>>> Can anyone please add more insights into why above mentioned behaviors >>>>> would >>>>> have happened? >>>>> Also does flink store metric values as a part of state for stateful >>>>> operators? Is there any way to configure that? >>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> View this message in context: >>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-metrics-related-problems-questions-tp13218.html >>>>> Sent from the Apache Flink User Mailing List archive. mailing list >>>>> archive at Nabble.com. >>>>> >>>> >> >