[ 
https://issues.apache.org/jira/browse/KAFKA-3456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15210060#comment-15210060
 ] 

The Data Lorax commented on KAFKA-3456:
---------------------------------------

[~aauradkar], I see you've made changes in this area - would be interested to 
hear your thoughts...

> In-house KafkaMetric misreports metrics when periodically observed
> ------------------------------------------------------------------
>
>                 Key: KAFKA-3456
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3456
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer, core, producer 
>    Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0
>            Reporter: The Data Lorax
>            Assignee: Neha Narkhede
>            Priority: Minor
>
> The metrics captured by Kafka through the in-house {{SampledStat}} suffer 
> from misreporting metrics if observed in a periodic manner.
> Consider a {{Rate}} metric that is using the default 2 samples and 30 second 
> sample window i.e. the {{Rate}} is capturing 60 seconds worth of data.  So, 
> to report this metric to some external system we might poll it every 60 
> seconds to observe the current value. Using a shorter period would, in the 
> case of a {{Rate}}, lead to smoothing of the plotted data, and worse, in the 
> case of a {{Count}}, would lead to double counting - so 60 seconds is the 
> only period at which we can poll the metrics if we are to report accurate 
> metrics.
> To demonstrate the issue consider the following somewhat extreme case:
> The {{Rate}}  is capturing data from a system which alternates between a 999 
> per sec rate and a 1 per sec rate every 30 seconds, with the different rates 
> aligned with the sample boundaries within the {{Rate}} instance i.e. after 60 
> seconds the first sample within the {{Rate}} instance will have a rate of 999 
> per sec, and the second 1 per sec. 
> If we were to ask the metric for its value at this 60 second boundary it 
> would correctly report 500 per sec. However, if we asked it again 1 
> millisecond later it would report 1 per sec, as the first sample window has 
> been aged out. Depending on how retarded into the 60 sec period of the metric 
> our periodic poll of the metric was, we would observe a constant rate 
> somewhere in the range of 1 to 500 per second, most likely around the 250 
> mark. 
> Other metrics based off of the {{SampledStat}} type suffer from the same 
> issue e.g. the {{Count}} metric, given a constant rate of 1 per second, will 
> report a constant count somewhere between 30 and 60, rather than the correct 
> 60.
> This can be seen in the following test code:
> {code:java}
> public class MetricsTest {
>     private MetricConfig metricsConfig;
>     @Before
>     public void setUp() throws Exception {
>         metricsConfig = new MetricConfig();
>     }
>     private long t(final int bucket) {
>         return metricsConfig.timeWindowMs() * bucket;
>     }
>     @Test
>     public void testHowRateDropsMetrics() throws Exception {
>         Rate rate = new Rate();
>         metricsConfig.samples(2);
>         metricsConfig.timeWindow(30, TimeUnit.SECONDS);
>         // First sample window from t0 -> (t1 -1), with rate 999 per second:
>         for (long time = t(0); time != t(1); time += 1000) {
>             rate.record(metricsConfig, 999, time);
>         }
>         // Second sample window from t1 -> (t2 -1), with rate 1 per second:
>         for (long time = t(1); time != t(2); time += 1000) {
>             rate.record(metricsConfig, 1, time);
>         }
>         // Measure at bucket boundary, (though same issue exists all periodic 
> measurements)
>         final double m1 = rate.measure(metricsConfig, t(2));    // m1 = 1.0
>         // Third sample window from t2 -> (t3 -1), with rate 999 per second:
>         for (long time = t(2); time != t(3); time += 1000) {
>             rate.record(metricsConfig, 999, time);
>         }
>         // Second sample window from t3 -> (t4 -1), with rate 1 per second:
>         for (long time = t(3); time != t(4); time += 1000) {
>             rate.record(metricsConfig, 1, time);
>         }
>         // Measure second pair of samples:
>         final double m2 = rate.measure(metricsConfig, t(4));    // m2 = 1.0
>         assertEquals("Measurement of the rate over the first two samples", 
> 500.0, m1, 2.0);
>         assertEquals("Measurement of the rate over the last two samples", 
> 500.0, m2, 2.0);
>     }
>     @Test
>     public void testHowRateDropsMetricsWithRetardedObservations() throws 
> Exception {
>         final long retardation = 1000;
>         Rate rate = new Rate();
>         metricsConfig.samples(2);
>         metricsConfig.timeWindow(30, TimeUnit.SECONDS);
>         // First sample window from t0 -> (t1 -1), with rate 999 per second:
>         for (long time = t(0); time != t(1); time += 1000) {
>             rate.record(metricsConfig, 999, time);
>         }
>         // Second sample window from t1 -> (t2 -1), with rate 1 per second:
>         for (long time = t(1); time != t(2); time += 1000) {
>             rate.record(metricsConfig, 1, time);
>         }
>         double m1 = 0.0;
>         // Third sample window from t2 -> (t3 -1), with rate 999 per second:
>         for (long time = t(2); time != t(3); time += 1000) {
>             rate.record(metricsConfig, 999, time);
>             if (time == t(2) + retardation) {
>                 m1 = rate.measure(metricsConfig, time); // // m1 = 
> 65.something
>             }
>         }
>         // Second sample window from t3 -> (t4 -1), with rate 1 per second:
>         for (long time = t(3); time != t(4); time += 1000) {
>             rate.record(metricsConfig, 1, time);
>         }
>         double m2 = 0.0;
>         // Fifth sample window from t4 -> (t5 -1), with rate 999 per second:
>         for (long time = t(4); time != t(5); time += 1000) {
>             rate.record(metricsConfig, 999, time);
>             if (time == t(4) + retardation) {
>                 m2 = rate.measure(metricsConfig, time); // m2 = 65.something
>             }
>         }
>         assertTrue("Measurement of the rate over the first two samples should 
> be well over 250 per sec", m1 > 250.0);
>         assertTrue("Measurement of the rate over the last two samples should 
> be well over 250 per sec", m2 > 250.0);
>     }
>     @Test
>     public void testHowCountDropsMetrics() throws Exception {
>         Count count = new Count();
>         metricsConfig.samples(2);
>         metricsConfig.timeWindow(30, TimeUnit.SECONDS);
>         // Record count for constant rate of 1 per second over entire 60 
> second period:
>         for (long time = t(0); time != t(2); time += 1000) {
>             count.record(metricsConfig, 1, time);
>         }
>         final double m1 = count.measure(metricsConfig, t(2) -1);    // m1 = 60
>         count.record(metricsConfig, 1, t(2));
>         final double m2 = count.measure(metricsConfig, t(2));       // m2 = 31
>         assertEquals("Measurement of the count at end of the 60 second 
> period", 60, m1, 0.1);
>         assertEquals("Measurement of the count at 1ms after the 60 second 
> period", 60, m2, 0.1);
>     }
> }
> {code}
> I'm happy to work on the solution to this, but first want to check I've not 
> missed something and I'm not doing something stupid. I think my reasoning is 
> sound.
> One solution would be to keep 'n + 1' samples, and only combine the complete 
> 'n' samples when asked for a value. While this would provide the correct 
> metrics, such metrics would respond in a delayed manner to any change, with 
> the maximum delay being the configured time of 1 sample, e.g, by default 30s 
> maximum and 15s average delay. 
> Which is better: correct delayed stats or incorrect responsive stats? Tough 
> question! Depends on the use-case for the stats - which I've noticed includes 
> internal quotas.  Personally, I'd err towards correct stats. But I'm 
> interested to hear others thoughts...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to