dajac commented on a change in pull request #9114: URL: https://github.com/apache/kafka/pull/9114#discussion_r465886919
########## File path: clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java ########## @@ -209,4 +212,95 @@ public void shouldReturnPresenceOfMetrics() { assertThat(sensor.hasMetrics(), is(true)); } + + @Test + public void testStrictQuotaEnforcementWithRate() { + final Time time = new MockTime(0, System.currentTimeMillis(), 0); + final Metrics metrics = new Metrics(time); + final Sensor sensor = metrics.sensor("sensor", new MetricConfig() + .quota(Quota.upperBound(2)) + .timeWindow(1, TimeUnit.SECONDS) + .samples(11)); + final MetricName metricName = metrics.metricName("rate", "test-group"); + assertTrue(sensor.add(metricName, new Rate())); + final KafkaMetric rateMetric = metrics.metric(metricName); + + // Recording a first value at T+0 to bring the avg rate to 3 which is already + // above the quota. + strictRecord(sensor, 30, time.milliseconds()); + assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1); + + // Theoretically, we should wait 5s to bring back the avg rate to the define quota: + // ((30 / 10) - 2) / 2 * 10 = 5s + time.sleep(5000); + + // But, recording a second value is rejected because the avg rate is still equal + // to 3 after 5s. + assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1); + assertThrows(QuotaViolationException.class, () -> strictRecord(sensor, 30, time.milliseconds())); + + metrics.close(); + } + + @Test + public void testStrictQuotaEnforcementWithTokenBucket() { + final Time time = new MockTime(0, System.currentTimeMillis(), 0); + final Metrics metrics = new Metrics(time); + final Sensor sensor = metrics.sensor("sensor", new MetricConfig() + .quota(Quota.upperBound(2)) + .timeWindow(1, TimeUnit.SECONDS) + .samples(11)); + final MetricName metricName = metrics.metricName("credits", "test-group"); + assertTrue(sensor.add(metricName, new TokenBucket())); + final KafkaMetric tkMetric = metrics.metric(metricName); + + // Recording a first value at T+0 to bring the remaining credits below zero + strictRecord(sensor, 30, time.milliseconds()); + assertEquals(-10, tkMetric.measurableValue(time.milliseconds()), 0.1); + + // Theoretically, we should wait 5s to bring back the avg rate to the define quota: + // 10 / 2 = 5s + time.sleep(5000); + + // Unlike the default rate based on a windowed sum, it works as expected. + assertEquals(0, tkMetric.measurableValue(time.milliseconds()), 0.1); + strictRecord(sensor, 30, time.milliseconds()); + assertEquals(-30, tkMetric.measurableValue(time.milliseconds()), 0.1); + + metrics.close(); + } + + private void strictRecord(Sensor sensor, double value, long timeMs) { + synchronized (sensor) { + sensor.checkQuotas(timeMs); Review comment: In the above two tests, I simulate a "strict quotas" in the sense that recording is not allowed if the quota is already violated. Therefore, I check it before recording the value. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org