NicoK commented on a change in pull request #8888: [FLINK-12983][metrics] replace descriptive histogram's storage back-end URL: https://github.com/apache/flink/pull/8888#discussion_r315374624
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogram.java ########## @@ -27,27 +27,63 @@ */ public class DescriptiveStatisticsHistogram implements org.apache.flink.metrics.Histogram { - private final DescriptiveStatistics descriptiveStatistics; - - private long elementsSeen = 0L; + private final CircularDoubleArray descriptiveStatistics; public DescriptiveStatisticsHistogram(int windowSize) { - this.descriptiveStatistics = new DescriptiveStatistics(windowSize); + this.descriptiveStatistics = new CircularDoubleArray(windowSize); } @Override public void update(long value) { - elementsSeen += 1L; this.descriptiveStatistics.addValue(value); } @Override public long getCount() { - return this.elementsSeen; + return this.descriptiveStatistics.getElementsSeen(); } @Override public HistogramStatistics getStatistics() { return new DescriptiveStatisticsHistogramStatistics(this.descriptiveStatistics); } + + /** + * Fixed-size array that wraps around at the end and has a dynamic start position. + */ + static class CircularDoubleArray { + private final double[] backingArray; + private int nextPos = 0; + private boolean fullSize = false; + private long elementsSeen = 0; + + CircularDoubleArray(int windowSize) { + this.backingArray = new double[windowSize]; + } + + synchronized void addValue(double value) { + backingArray[nextPos] = value; + ++elementsSeen; + ++nextPos; + if (nextPos == backingArray.length) { + nextPos = 0; + fullSize = true; + } + } + + synchronized double[] toUnsortedArray() { + final int size = getSize(); + double[] result = new double[size]; + System.arraycopy(backingArray, 0, result, 0, result.length); Review comment: `CircularDoubleArray` is a package-private class - I'm wondering who/which component would require a sorted array then. This circular array has special APIs (making clear that we return an unsorted array) and is only used by `org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogramStatistics.CommonMetricsSnapshot` which does not require sorting. Whether sorting is required is basically defined via `org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogramStatistics.CommonMetricsSnapshot`. Out of curiosity, I added this method and used it instead: ``` public synchronized double[] toArray() { if (fullSize) { final double[] result = new double[backingArray.length]; final int firstLength = result.length - nextPos; System.arraycopy(backingArray, nextPos, result, 0, firstLength); System.arraycopy(backingArray, 0, result, firstLength, nextPos); return result; } else { final double[] result = new double[nextPos]; System.arraycopy(backingArray, 0, result, 0, nextPos); return result; } } ``` First quick results are as follows and show the benefit for not working with a sorted array if we don't need it (and are requesting the histogram a lot): ``` Flink 1.9 Benchmark Mode Cnt Score Error Units HistogramBenchmarks.descriptiveHistogram thrpt 30 89.224 ± 1.974 ops/ms HistogramBenchmarks.descriptiveHistogramAdd thrpt 30 56034.903 ± 939.263 ops/ms Flink 1.10 + FLINK-12983 Benchmark Mode Cnt Score Error Units HistogramBenchmarks.descriptiveHistogram thrpt 30 280.240 ± 3.747 ops/ms HistogramBenchmarks.descriptiveHistogramAdd thrpt 30 207176.894 ± 2417.831 ops/ms Flink 1.10 + FLINK-12983 + sortedArray Benchmark Mode Cnt Score Error Units HistogramBenchmarks.descriptiveHistogram thrpt 30 239.000 ± 1.506 ops/ms HistogramBenchmarks.descriptiveHistogramAdd thrpt 30 210352.180 ± 1135.394 ops/ms ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services