dajac commented on a change in pull request #9114: URL: https://github.com/apache/kafka/pull/9114#discussion_r465888817
########## File path: clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java ########## @@ -209,4 +212,95 @@ public void shouldReturnPresenceOfMetrics() { assertThat(sensor.hasMetrics(), is(true)); } + + @Test + public void testStrictQuotaEnforcementWithRate() { + final Time time = new MockTime(0, System.currentTimeMillis(), 0); + final Metrics metrics = new Metrics(time); + final Sensor sensor = metrics.sensor("sensor", new MetricConfig() + .quota(Quota.upperBound(2)) + .timeWindow(1, TimeUnit.SECONDS) + .samples(11)); + final MetricName metricName = metrics.metricName("rate", "test-group"); + assertTrue(sensor.add(metricName, new Rate())); + final KafkaMetric rateMetric = metrics.metric(metricName); + + // Recording a first value at T+0 to bring the avg rate to 3 which is already + // above the quota. + strictRecord(sensor, 30, time.milliseconds()); + assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1); + + // Theoretically, we should wait 5s to bring back the avg rate to the define quota: + // ((30 / 10) - 2) / 2 * 10 = 5s + time.sleep(5000); + + // But, recording a second value is rejected because the avg rate is still equal + // to 3 after 5s. + assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1); + assertThrows(QuotaViolationException.class, () -> strictRecord(sensor, 30, time.milliseconds())); + + metrics.close(); + } + + @Test + public void testStrictQuotaEnforcementWithTokenBucket() { + final Time time = new MockTime(0, System.currentTimeMillis(), 0); + final Metrics metrics = new Metrics(time); + final Sensor sensor = metrics.sensor("sensor", new MetricConfig() + .quota(Quota.upperBound(2)) + .timeWindow(1, TimeUnit.SECONDS) + .samples(11)); + final MetricName metricName = metrics.metricName("credits", "test-group"); + assertTrue(sensor.add(metricName, new TokenBucket())); + final KafkaMetric tkMetric = metrics.metric(metricName); + + // Recording a first value at T+0 to bring the remaining credits below zero + strictRecord(sensor, 30, time.milliseconds()); + assertEquals(-10, tkMetric.measurableValue(time.milliseconds()), 0.1); + + // Theoretically, we should wait 5s to bring back the avg rate to the define quota: + // 10 / 2 = 5s + time.sleep(5000); + + // Unlike the default rate based on a windowed sum, it works as expected. + assertEquals(0, tkMetric.measurableValue(time.milliseconds()), 0.1); + strictRecord(sensor, 30, time.milliseconds()); + assertEquals(-30, tkMetric.measurableValue(time.milliseconds()), 0.1); + + metrics.close(); + } + + private void strictRecord(Sensor sensor, double value, long timeMs) { + synchronized (sensor) { + sensor.checkQuotas(timeMs); + sensor.record(value, timeMs, false); + } + } + + @Test + public void testRecordAndCheckQuotaUseMetricConfigOfEachStat() { + final Time time = new MockTime(0, System.currentTimeMillis(), 0); + final Metrics metrics = new Metrics(time); + final Sensor sensor = metrics.sensor("sensor"); + + final MeasurableStat stat1 = Mockito.mock(MeasurableStat.class); + final MetricName stat1Name = metrics.metricName("stat1", "test-group"); + final MetricConfig stat1Config = new MetricConfig().quota(Quota.upperBound(5)); + sensor.add(stat1Name, stat1, stat1Config); + + final MeasurableStat stat2 = Mockito.mock(MeasurableStat.class); + final MetricName stat2Name = metrics.metricName("stat2", "test-group"); + final MetricConfig stat2Config = new MetricConfig().quota(Quota.upperBound(10)); + sensor.add(stat2Name, stat2, stat2Config); + + sensor.record(10, 1); + Mockito.verify(stat1).record(stat1Config, 10, 1); + Mockito.verify(stat2).record(stat2Config, 10, 1); + + Mockito.when(stat1.measure(stat1Config, 2)).thenReturn(2.0); + Mockito.when(stat2.measure(stat2Config, 2)).thenReturn(2.0); + sensor.checkQuotas(2); Review comment: No. This test actually reproduce a bug that I have found. Basically, a stat can be added to the Sensor with a MetricsConfig but the Sensor was not using the provided one when recording a value but was using the one of the Sensor all the time. This test verifies that the correct config is used both for recording and measuring via calling checkQuota. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org