[ https://issues.apache.org/jira/browse/KAFKA-3456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15213062#comment-15213062 ]
The Data Lorax commented on KAFKA-3456: --------------------------------------- Yep, increasing the number of samples certainly helps reduce the error. Likewise, with the n+1 model increasing the samples reduces the latency. I think the issue is not around the variable time period so much, but the fact that any data captured in the current sample after the periodic observation is never reported, as it's evicted before the next observation. Consider a Count metric with 2 samples within a 60s period, that is being incremented every second. If we observe the metric half way though a sample then the count will be 45, not 60, and it will continue to be 45 if we keep observing the metric every minute. For me, I'd prefer correct delayed metrics instead of timely metrics I can't trust. And I'd prefer to be able to reduce the delay my using higher samples than being able to reduce, but not remove, the error. > In-house KafkaMetric misreports metrics when periodically observed > ------------------------------------------------------------------ > > Key: KAFKA-3456 > URL: https://issues.apache.org/jira/browse/KAFKA-3456 > Project: Kafka > Issue Type: Bug > Components: consumer, core, producer > Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0 > Reporter: The Data Lorax > Assignee: Neha Narkhede > Priority: Minor > > The metrics captured by Kafka through the in-house {{SampledStat}} suffer > from misreporting metrics if observed in a periodic manner. > Consider a {{Rate}} metric that is using the default 2 samples and 30 second > sample window i.e. the {{Rate}} is capturing 60 seconds worth of data. So, > to report this metric to some external system we might poll it every 60 > seconds to observe the current value. Using a shorter period would, in the > case of a {{Rate}}, lead to smoothing of the plotted data, and worse, in the > case of a {{Count}}, would lead to double counting - so 60 seconds is the > only period at which we can poll the metrics if we are to report accurate > metrics. > To demonstrate the issue consider the following somewhat extreme case: > The {{Rate}} is capturing data from a system which alternates between a 999 > per sec rate and a 1 per sec rate every 30 seconds, with the different rates > aligned with the sample boundaries within the {{Rate}} instance i.e. after 60 > seconds the first sample within the {{Rate}} instance will have a rate of 999 > per sec, and the second 1 per sec. > If we were to ask the metric for its value at this 60 second boundary it > would correctly report 500 per sec. However, if we asked it again 1 > millisecond later it would report 1 per sec, as the first sample window has > been aged out. Depending on how retarded into the 60 sec period of the metric > our periodic poll of the metric was, we would observe a constant rate > somewhere in the range of 1 to 500 per second, most likely around the 250 > mark. > Other metrics based off of the {{SampledStat}} type suffer from the same > issue e.g. the {{Count}} metric, given a constant rate of 1 per second, will > report a constant count somewhere between 30 and 60, rather than the correct > 60. > This can be seen in the following test code: > {code:java} > public class MetricsTest { > private MetricConfig metricsConfig; > @Before > public void setUp() throws Exception { > metricsConfig = new MetricConfig(); > } > private long t(final int bucket) { > return metricsConfig.timeWindowMs() * bucket; > } > @Test > public void testHowRateDropsMetrics() throws Exception { > Rate rate = new Rate(); > metricsConfig.samples(2); > metricsConfig.timeWindow(30, TimeUnit.SECONDS); > // First sample window from t0 -> (t1 -1), with rate 999 per second: > for (long time = t(0); time != t(1); time += 1000) { > rate.record(metricsConfig, 999, time); > } > // Second sample window from t1 -> (t2 -1), with rate 1 per second: > for (long time = t(1); time != t(2); time += 1000) { > rate.record(metricsConfig, 1, time); > } > // Measure at bucket boundary, (though same issue exists all periodic > measurements) > final double m1 = rate.measure(metricsConfig, t(2)); // m1 = 1.0 > // Third sample window from t2 -> (t3 -1), with rate 999 per second: > for (long time = t(2); time != t(3); time += 1000) { > rate.record(metricsConfig, 999, time); > } > // Second sample window from t3 -> (t4 -1), with rate 1 per second: > for (long time = t(3); time != t(4); time += 1000) { > rate.record(metricsConfig, 1, time); > } > // Measure second pair of samples: > final double m2 = rate.measure(metricsConfig, t(4)); // m2 = 1.0 > assertEquals("Measurement of the rate over the first two samples", > 500.0, m1, 2.0); > assertEquals("Measurement of the rate over the last two samples", > 500.0, m2, 2.0); > } > @Test > public void testHowRateDropsMetricsWithRetardedObservations() throws > Exception { > final long retardation = 1000; > Rate rate = new Rate(); > metricsConfig.samples(2); > metricsConfig.timeWindow(30, TimeUnit.SECONDS); > // First sample window from t0 -> (t1 -1), with rate 999 per second: > for (long time = t(0); time != t(1); time += 1000) { > rate.record(metricsConfig, 999, time); > } > // Second sample window from t1 -> (t2 -1), with rate 1 per second: > for (long time = t(1); time != t(2); time += 1000) { > rate.record(metricsConfig, 1, time); > } > double m1 = 0.0; > // Third sample window from t2 -> (t3 -1), with rate 999 per second: > for (long time = t(2); time != t(3); time += 1000) { > rate.record(metricsConfig, 999, time); > if (time == t(2) + retardation) { > m1 = rate.measure(metricsConfig, time); // // m1 = > 65.something > } > } > // Second sample window from t3 -> (t4 -1), with rate 1 per second: > for (long time = t(3); time != t(4); time += 1000) { > rate.record(metricsConfig, 1, time); > } > double m2 = 0.0; > // Fifth sample window from t4 -> (t5 -1), with rate 999 per second: > for (long time = t(4); time != t(5); time += 1000) { > rate.record(metricsConfig, 999, time); > if (time == t(4) + retardation) { > m2 = rate.measure(metricsConfig, time); // m2 = 65.something > } > } > assertTrue("Measurement of the rate over the first two samples should > be well over 250 per sec", m1 > 250.0); > assertTrue("Measurement of the rate over the last two samples should > be well over 250 per sec", m2 > 250.0); > } > @Test > public void testHowCountDropsMetrics() throws Exception { > Count count = new Count(); > metricsConfig.samples(2); > metricsConfig.timeWindow(30, TimeUnit.SECONDS); > // Record count for constant rate of 1 per second over entire 60 > second period: > for (long time = t(0); time != t(2); time += 1000) { > count.record(metricsConfig, 1, time); > } > final double m1 = count.measure(metricsConfig, t(2) -1); // m1 = 60 > count.record(metricsConfig, 1, t(2)); > final double m2 = count.measure(metricsConfig, t(2)); // m2 = 31 > assertEquals("Measurement of the count at end of the 60 second > period", 60, m1, 0.1); > assertEquals("Measurement of the count at 1ms after the 60 second > period", 60, m2, 0.1); > } > } > {code} > I'm happy to work on the solution to this, but first want to check I've not > missed something and I'm not doing something stupid. I think my reasoning is > sound. > One solution would be to keep 'n + 1' samples, and only combine the complete > 'n' samples when asked for a value. While this would provide the correct > metrics, such metrics would respond in a delayed manner to any change, with > the maximum delay being the configured time of 1 sample, e.g, by default 30s > maximum and 15s average delay. > Which is better: correct delayed stats or incorrect responsive stats? Tough > question! Depends on the use-case for the stats - which I've noticed includes > internal quotas. Personally, I'd err towards correct stats. But I'm > interested to hear others thoughts... -- This message was sent by Atlassian JIRA (v6.3.4#6332)