[ https://issues.apache.org/jira/browse/KAFKA-3456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15210060#comment-15210060 ]
The Data Lorax commented on KAFKA-3456: --------------------------------------- [~aauradkar], I see you've made changes in this area - would be interested to hear your thoughts... > In-house KafkaMetric misreports metrics when periodically observed > ------------------------------------------------------------------ > > Key: KAFKA-3456 > URL: https://issues.apache.org/jira/browse/KAFKA-3456 > Project: Kafka > Issue Type: Bug > Components: consumer, core, producer > Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0 > Reporter: The Data Lorax > Assignee: Neha Narkhede > Priority: Minor > > The metrics captured by Kafka through the in-house {{SampledStat}} suffer > from misreporting metrics if observed in a periodic manner. > Consider a {{Rate}} metric that is using the default 2 samples and 30 second > sample window i.e. the {{Rate}} is capturing 60 seconds worth of data. So, > to report this metric to some external system we might poll it every 60 > seconds to observe the current value. Using a shorter period would, in the > case of a {{Rate}}, lead to smoothing of the plotted data, and worse, in the > case of a {{Count}}, would lead to double counting - so 60 seconds is the > only period at which we can poll the metrics if we are to report accurate > metrics. > To demonstrate the issue consider the following somewhat extreme case: > The {{Rate}} is capturing data from a system which alternates between a 999 > per sec rate and a 1 per sec rate every 30 seconds, with the different rates > aligned with the sample boundaries within the {{Rate}} instance i.e. after 60 > seconds the first sample within the {{Rate}} instance will have a rate of 999 > per sec, and the second 1 per sec. > If we were to ask the metric for its value at this 60 second boundary it > would correctly report 500 per sec. However, if we asked it again 1 > millisecond later it would report 1 per sec, as the first sample window has > been aged out. Depending on how retarded into the 60 sec period of the metric > our periodic poll of the metric was, we would observe a constant rate > somewhere in the range of 1 to 500 per second, most likely around the 250 > mark. > Other metrics based off of the {{SampledStat}} type suffer from the same > issue e.g. the {{Count}} metric, given a constant rate of 1 per second, will > report a constant count somewhere between 30 and 60, rather than the correct > 60. > This can be seen in the following test code: > {code:java} > public class MetricsTest { > private MetricConfig metricsConfig; > @Before > public void setUp() throws Exception { > metricsConfig = new MetricConfig(); > } > private long t(final int bucket) { > return metricsConfig.timeWindowMs() * bucket; > } > @Test > public void testHowRateDropsMetrics() throws Exception { > Rate rate = new Rate(); > metricsConfig.samples(2); > metricsConfig.timeWindow(30, TimeUnit.SECONDS); > // First sample window from t0 -> (t1 -1), with rate 999 per second: > for (long time = t(0); time != t(1); time += 1000) { > rate.record(metricsConfig, 999, time); > } > // Second sample window from t1 -> (t2 -1), with rate 1 per second: > for (long time = t(1); time != t(2); time += 1000) { > rate.record(metricsConfig, 1, time); > } > // Measure at bucket boundary, (though same issue exists all periodic > measurements) > final double m1 = rate.measure(metricsConfig, t(2)); // m1 = 1.0 > // Third sample window from t2 -> (t3 -1), with rate 999 per second: > for (long time = t(2); time != t(3); time += 1000) { > rate.record(metricsConfig, 999, time); > } > // Second sample window from t3 -> (t4 -1), with rate 1 per second: > for (long time = t(3); time != t(4); time += 1000) { > rate.record(metricsConfig, 1, time); > } > // Measure second pair of samples: > final double m2 = rate.measure(metricsConfig, t(4)); // m2 = 1.0 > assertEquals("Measurement of the rate over the first two samples", > 500.0, m1, 2.0); > assertEquals("Measurement of the rate over the last two samples", > 500.0, m2, 2.0); > } > @Test > public void testHowRateDropsMetricsWithRetardedObservations() throws > Exception { > final long retardation = 1000; > Rate rate = new Rate(); > metricsConfig.samples(2); > metricsConfig.timeWindow(30, TimeUnit.SECONDS); > // First sample window from t0 -> (t1 -1), with rate 999 per second: > for (long time = t(0); time != t(1); time += 1000) { > rate.record(metricsConfig, 999, time); > } > // Second sample window from t1 -> (t2 -1), with rate 1 per second: > for (long time = t(1); time != t(2); time += 1000) { > rate.record(metricsConfig, 1, time); > } > double m1 = 0.0; > // Third sample window from t2 -> (t3 -1), with rate 999 per second: > for (long time = t(2); time != t(3); time += 1000) { > rate.record(metricsConfig, 999, time); > if (time == t(2) + retardation) { > m1 = rate.measure(metricsConfig, time); // // m1 = > 65.something > } > } > // Second sample window from t3 -> (t4 -1), with rate 1 per second: > for (long time = t(3); time != t(4); time += 1000) { > rate.record(metricsConfig, 1, time); > } > double m2 = 0.0; > // Fifth sample window from t4 -> (t5 -1), with rate 999 per second: > for (long time = t(4); time != t(5); time += 1000) { > rate.record(metricsConfig, 999, time); > if (time == t(4) + retardation) { > m2 = rate.measure(metricsConfig, time); // m2 = 65.something > } > } > assertTrue("Measurement of the rate over the first two samples should > be well over 250 per sec", m1 > 250.0); > assertTrue("Measurement of the rate over the last two samples should > be well over 250 per sec", m2 > 250.0); > } > @Test > public void testHowCountDropsMetrics() throws Exception { > Count count = new Count(); > metricsConfig.samples(2); > metricsConfig.timeWindow(30, TimeUnit.SECONDS); > // Record count for constant rate of 1 per second over entire 60 > second period: > for (long time = t(0); time != t(2); time += 1000) { > count.record(metricsConfig, 1, time); > } > final double m1 = count.measure(metricsConfig, t(2) -1); // m1 = 60 > count.record(metricsConfig, 1, t(2)); > final double m2 = count.measure(metricsConfig, t(2)); // m2 = 31 > assertEquals("Measurement of the count at end of the 60 second > period", 60, m1, 0.1); > assertEquals("Measurement of the count at 1ms after the 60 second > period", 60, m2, 0.1); > } > } > {code} > I'm happy to work on the solution to this, but first want to check I've not > missed something and I'm not doing something stupid. I think my reasoning is > sound. > One solution would be to keep 'n + 1' samples, and only combine the complete > 'n' samples when asked for a value. While this would provide the correct > metrics, such metrics would respond in a delayed manner to any change, with > the maximum delay being the configured time of 1 sample, e.g, by default 30s > maximum and 15s average delay. > Which is better: correct delayed stats or incorrect responsive stats? Tough > question! Depends on the use-case for the stats - which I've noticed includes > internal quotas. Personally, I'd err towards correct stats. But I'm > interested to hear others thoughts... -- This message was sent by Atlassian JIRA (v6.3.4#6332)