Paymahn created FLINK-32950: ------------------------------- Summary: statsd reporter does not follow spec for counters Key: FLINK-32950 URL: https://issues.apache.org/jira/browse/FLINK-32950 Project: Flink Issue Type: Bug Components: Runtime / Metrics Reporter: Paymahn
The [statsd|https://github.com/statsd/statsd/blob/master/docs/metric_types.md] spec says the following: > At each flush the current count is sent and reset to 0. The flink [statsd reporter|https://github.com/apache/flink/blob/e5f78352a29df0d4dfe0c34369193896e7a1b4be/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java#L129-L131] does not reset the counter to 0 after each flush. Instead it reports cumulative values. This is not correct and causes issues with downstream clients which consume these statsd metrics. One possible fix would be do add the following as a class variable {code:java} protected final Map<Counter, Long> lastKnownCounterValues = new ConcurrentHashMap<>();{code} and then modify the {{reportCounter}} function like so {code:java} private void reportCounter(final DMetric metric, final Counter counter) { // the statsd protocol says that counters should be set to 0 after flushing // https://github.com/statsd/statsd/blob/master/docs/metric_types.md#counting // we don't want to actually change the value of the counter because it could have uninteded // consequences. Instead, we keep track of the last known value and report the delta long curCount = counter.getCount(); long lastKnownCount = this.lastKnownCounterValues.getOrDefault(counter, 0L); send(metric.getName(), curCount - lastKnownCount, DMetricType.COUNTER, metric.getTags()); this.lastKnownCounterValues.put(counter, curCount); }{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)