[ https://issues.apache.org/jira/browse/KAFKA-3456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15212514#comment-15212514 ]
Aditya Auradkar commented on KAFKA-3456: ---------------------------------------- Accidentally changed the thread title to gibberish :). Changed it back. I think that the problem (and I'm not convinced it is a problem) is that when you have 2 windows, the rate can change significantly when a new window is created. You effectively throw away half of your samples and start seeding that data again. This can skew measurement. IIUC, the problem isn't that an incorrect rate is being reported, it is simply being reported over a potentially variable interval. Configuring a larger number of samples will reduce the time interval variability and can smooth this significantly. Extending your example, if you have 10 windows (6 seconds each), and you alternate between 999 and 1 req/sec in each of these samples. Your rate over 60 seconds will be 500. If you roll over your first sample of 999, the rate changes to ~450 which seems closer to what you want? > In-house KafkaMetric misreports metrics when periodically observed > ------------------------------------------------------------------ > > Key: KAFKA-3456 > URL: https://issues.apache.org/jira/browse/KAFKA-3456 > Project: Kafka > Issue Type: Bug > Components: consumer, core, producer > Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0 > Reporter: The Data Lorax > Assignee: Neha Narkhede > Priority: Minor > > The metrics captured by Kafka through the in-house {{SampledStat}} suffer > from misreporting metrics if observed in a periodic manner. > Consider a {{Rate}} metric that is using the default 2 samples and 30 second > sample window i.e. the {{Rate}} is capturing 60 seconds worth of data. So, > to report this metric to some external system we might poll it every 60 > seconds to observe the current value. Using a shorter period would, in the > case of a {{Rate}}, lead to smoothing of the plotted data, and worse, in the > case of a {{Count}}, would lead to double counting - so 60 seconds is the > only period at which we can poll the metrics if we are to report accurate > metrics. > To demonstrate the issue consider the following somewhat extreme case: > The {{Rate}} is capturing data from a system which alternates between a 999 > per sec rate and a 1 per sec rate every 30 seconds, with the different rates > aligned with the sample boundaries within the {{Rate}} instance i.e. after 60 > seconds the first sample within the {{Rate}} instance will have a rate of 999 > per sec, and the second 1 per sec. > If we were to ask the metric for its value at this 60 second boundary it > would correctly report 500 per sec. However, if we asked it again 1 > millisecond later it would report 1 per sec, as the first sample window has > been aged out. Depending on how retarded into the 60 sec period of the metric > our periodic poll of the metric was, we would observe a constant rate > somewhere in the range of 1 to 500 per second, most likely around the 250 > mark. > Other metrics based off of the {{SampledStat}} type suffer from the same > issue e.g. the {{Count}} metric, given a constant rate of 1 per second, will > report a constant count somewhere between 30 and 60, rather than the correct > 60. > This can be seen in the following test code: > {code:java} > public class MetricsTest { > private MetricConfig metricsConfig; > @Before > public void setUp() throws Exception { > metricsConfig = new MetricConfig(); > } > private long t(final int bucket) { > return metricsConfig.timeWindowMs() * bucket; > } > @Test > public void testHowRateDropsMetrics() throws Exception { > Rate rate = new Rate(); > metricsConfig.samples(2); > metricsConfig.timeWindow(30, TimeUnit.SECONDS); > // First sample window from t0 -> (t1 -1), with rate 999 per second: > for (long time = t(0); time != t(1); time += 1000) { > rate.record(metricsConfig, 999, time); > } > // Second sample window from t1 -> (t2 -1), with rate 1 per second: > for (long time = t(1); time != t(2); time += 1000) { > rate.record(metricsConfig, 1, time); > } > // Measure at bucket boundary, (though same issue exists all periodic > measurements) > final double m1 = rate.measure(metricsConfig, t(2)); // m1 = 1.0 > // Third sample window from t2 -> (t3 -1), with rate 999 per second: > for (long time = t(2); time != t(3); time += 1000) { > rate.record(metricsConfig, 999, time); > } > // Second sample window from t3 -> (t4 -1), with rate 1 per second: > for (long time = t(3); time != t(4); time += 1000) { > rate.record(metricsConfig, 1, time); > } > // Measure second pair of samples: > final double m2 = rate.measure(metricsConfig, t(4)); // m2 = 1.0 > assertEquals("Measurement of the rate over the first two samples", > 500.0, m1, 2.0); > assertEquals("Measurement of the rate over the last two samples", > 500.0, m2, 2.0); > } > @Test > public void testHowRateDropsMetricsWithRetardedObservations() throws > Exception { > final long retardation = 1000; > Rate rate = new Rate(); > metricsConfig.samples(2); > metricsConfig.timeWindow(30, TimeUnit.SECONDS); > // First sample window from t0 -> (t1 -1), with rate 999 per second: > for (long time = t(0); time != t(1); time += 1000) { > rate.record(metricsConfig, 999, time); > } > // Second sample window from t1 -> (t2 -1), with rate 1 per second: > for (long time = t(1); time != t(2); time += 1000) { > rate.record(metricsConfig, 1, time); > } > double m1 = 0.0; > // Third sample window from t2 -> (t3 -1), with rate 999 per second: > for (long time = t(2); time != t(3); time += 1000) { > rate.record(metricsConfig, 999, time); > if (time == t(2) + retardation) { > m1 = rate.measure(metricsConfig, time); // // m1 = > 65.something > } > } > // Second sample window from t3 -> (t4 -1), with rate 1 per second: > for (long time = t(3); time != t(4); time += 1000) { > rate.record(metricsConfig, 1, time); > } > double m2 = 0.0; > // Fifth sample window from t4 -> (t5 -1), with rate 999 per second: > for (long time = t(4); time != t(5); time += 1000) { > rate.record(metricsConfig, 999, time); > if (time == t(4) + retardation) { > m2 = rate.measure(metricsConfig, time); // m2 = 65.something > } > } > assertTrue("Measurement of the rate over the first two samples should > be well over 250 per sec", m1 > 250.0); > assertTrue("Measurement of the rate over the last two samples should > be well over 250 per sec", m2 > 250.0); > } > @Test > public void testHowCountDropsMetrics() throws Exception { > Count count = new Count(); > metricsConfig.samples(2); > metricsConfig.timeWindow(30, TimeUnit.SECONDS); > // Record count for constant rate of 1 per second over entire 60 > second period: > for (long time = t(0); time != t(2); time += 1000) { > count.record(metricsConfig, 1, time); > } > final double m1 = count.measure(metricsConfig, t(2) -1); // m1 = 60 > count.record(metricsConfig, 1, t(2)); > final double m2 = count.measure(metricsConfig, t(2)); // m2 = 31 > assertEquals("Measurement of the count at end of the 60 second > period", 60, m1, 0.1); > assertEquals("Measurement of the count at 1ms after the 60 second > period", 60, m2, 0.1); > } > } > {code} > I'm happy to work on the solution to this, but first want to check I've not > missed something and I'm not doing something stupid. I think my reasoning is > sound. > One solution would be to keep 'n + 1' samples, and only combine the complete > 'n' samples when asked for a value. While this would provide the correct > metrics, such metrics would respond in a delayed manner to any change, with > the maximum delay being the configured time of 1 sample, e.g, by default 30s > maximum and 15s average delay. > Which is better: correct delayed stats or incorrect responsive stats? Tough > question! Depends on the use-case for the stats - which I've noticed includes > internal quotas. Personally, I'd err towards correct stats. But I'm > interested to hear others thoughts... -- This message was sent by Atlassian JIRA (v6.3.4#6332)