dajac commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r465886919



##########
File path: clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
##########
@@ -209,4 +212,95 @@ public void shouldReturnPresenceOfMetrics() {
 
         assertThat(sensor.hasMetrics(), is(true));
     }
+
+    @Test
+    public void testStrictQuotaEnforcementWithRate() {
+        final Time time = new MockTime(0, System.currentTimeMillis(), 0);
+        final Metrics metrics = new Metrics(time);
+        final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+            .quota(Quota.upperBound(2))
+            .timeWindow(1, TimeUnit.SECONDS)
+            .samples(11));
+        final MetricName metricName = metrics.metricName("rate", "test-group");
+        assertTrue(sensor.add(metricName, new Rate()));
+        final KafkaMetric rateMetric = metrics.metric(metricName);
+
+        // Recording a first value at T+0 to bring the avg rate to 3 which is 
already
+        // above the quota.
+        strictRecord(sensor, 30, time.milliseconds());
+        assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+        // Theoretically, we should wait 5s to bring back the avg rate to the 
define quota:
+        // ((30 / 10) - 2) / 2 * 10 = 5s
+        time.sleep(5000);
+
+        // But, recording a second value is rejected because the avg rate is 
still equal
+        // to 3 after 5s.
+        assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1);
+        assertThrows(QuotaViolationException.class, () -> strictRecord(sensor, 
30, time.milliseconds()));
+
+        metrics.close();
+    }
+
+    @Test
+    public void testStrictQuotaEnforcementWithTokenBucket() {
+        final Time time = new MockTime(0, System.currentTimeMillis(), 0);
+        final Metrics metrics = new Metrics(time);
+        final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+            .quota(Quota.upperBound(2))
+            .timeWindow(1, TimeUnit.SECONDS)
+            .samples(11));
+        final MetricName metricName = metrics.metricName("credits", 
"test-group");
+        assertTrue(sensor.add(metricName, new TokenBucket()));
+        final KafkaMetric tkMetric = metrics.metric(metricName);
+
+        // Recording a first value at T+0 to bring the remaining credits below 
zero
+        strictRecord(sensor, 30, time.milliseconds());
+        assertEquals(-10, tkMetric.measurableValue(time.milliseconds()), 0.1);
+
+        // Theoretically, we should wait 5s to bring back the avg rate to the 
define quota:
+        // 10 / 2 = 5s
+        time.sleep(5000);
+
+        // Unlike the default rate based on a windowed sum, it works as 
expected.
+        assertEquals(0, tkMetric.measurableValue(time.milliseconds()), 0.1);
+        strictRecord(sensor, 30, time.milliseconds());
+        assertEquals(-30, tkMetric.measurableValue(time.milliseconds()), 0.1);
+
+        metrics.close();
+    }
+
+    private void strictRecord(Sensor sensor, double value, long timeMs) {
+        synchronized (sensor) {
+            sensor.checkQuotas(timeMs);

Review comment:
       In the above two tests, I simulate a "strict quotas" in the sense that 
recording is not allowed if the quota is already violated. Therefore, I check 
it before recording the value.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to