dajac commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r465888817



##########
File path: clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
##########
@@ -209,4 +212,95 @@ public void shouldReturnPresenceOfMetrics() {
 
         assertThat(sensor.hasMetrics(), is(true));
     }
+
+    @Test
+    public void testStrictQuotaEnforcementWithRate() {
+        final Time time = new MockTime(0, System.currentTimeMillis(), 0);
+        final Metrics metrics = new Metrics(time);
+        final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+            .quota(Quota.upperBound(2))
+            .timeWindow(1, TimeUnit.SECONDS)
+            .samples(11));
+        final MetricName metricName = metrics.metricName("rate", "test-group");
+        assertTrue(sensor.add(metricName, new Rate()));
+        final KafkaMetric rateMetric = metrics.metric(metricName);
+
+        // Recording a first value at T+0 to bring the avg rate to 3 which is 
already
+        // above the quota.
+        strictRecord(sensor, 30, time.milliseconds());
+        assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+        // Theoretically, we should wait 5s to bring back the avg rate to the 
define quota:
+        // ((30 / 10) - 2) / 2 * 10 = 5s
+        time.sleep(5000);
+
+        // But, recording a second value is rejected because the avg rate is 
still equal
+        // to 3 after 5s.
+        assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1);
+        assertThrows(QuotaViolationException.class, () -> strictRecord(sensor, 
30, time.milliseconds()));
+
+        metrics.close();
+    }
+
+    @Test
+    public void testStrictQuotaEnforcementWithTokenBucket() {
+        final Time time = new MockTime(0, System.currentTimeMillis(), 0);
+        final Metrics metrics = new Metrics(time);
+        final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+            .quota(Quota.upperBound(2))
+            .timeWindow(1, TimeUnit.SECONDS)
+            .samples(11));
+        final MetricName metricName = metrics.metricName("credits", 
"test-group");
+        assertTrue(sensor.add(metricName, new TokenBucket()));
+        final KafkaMetric tkMetric = metrics.metric(metricName);
+
+        // Recording a first value at T+0 to bring the remaining credits below 
zero
+        strictRecord(sensor, 30, time.milliseconds());
+        assertEquals(-10, tkMetric.measurableValue(time.milliseconds()), 0.1);
+
+        // Theoretically, we should wait 5s to bring back the avg rate to the 
define quota:
+        // 10 / 2 = 5s
+        time.sleep(5000);
+
+        // Unlike the default rate based on a windowed sum, it works as 
expected.
+        assertEquals(0, tkMetric.measurableValue(time.milliseconds()), 0.1);
+        strictRecord(sensor, 30, time.milliseconds());
+        assertEquals(-30, tkMetric.measurableValue(time.milliseconds()), 0.1);
+
+        metrics.close();
+    }
+
+    private void strictRecord(Sensor sensor, double value, long timeMs) {
+        synchronized (sensor) {
+            sensor.checkQuotas(timeMs);
+            sensor.record(value, timeMs, false);
+        }
+    }
+
+    @Test
+    public void testRecordAndCheckQuotaUseMetricConfigOfEachStat() {
+        final Time time = new MockTime(0, System.currentTimeMillis(), 0);
+        final Metrics metrics = new Metrics(time);
+        final Sensor sensor = metrics.sensor("sensor");
+
+        final MeasurableStat stat1 = Mockito.mock(MeasurableStat.class);
+        final MetricName stat1Name = metrics.metricName("stat1", "test-group");
+        final MetricConfig stat1Config = new 
MetricConfig().quota(Quota.upperBound(5));
+        sensor.add(stat1Name, stat1, stat1Config);
+
+        final MeasurableStat stat2 = Mockito.mock(MeasurableStat.class);
+        final MetricName stat2Name = metrics.metricName("stat2", "test-group");
+        final MetricConfig stat2Config = new 
MetricConfig().quota(Quota.upperBound(10));
+        sensor.add(stat2Name, stat2, stat2Config);
+
+        sensor.record(10, 1);
+        Mockito.verify(stat1).record(stat1Config, 10, 1);
+        Mockito.verify(stat2).record(stat2Config, 10, 1);
+
+        Mockito.when(stat1.measure(stat1Config, 2)).thenReturn(2.0);
+        Mockito.when(stat2.measure(stat2Config, 2)).thenReturn(2.0);
+        sensor.checkQuotas(2);

Review comment:
       No. This test actually reproduce a bug that I have found. Basically, a 
stat can be added to the Sensor with a MetricsConfig but the Sensor was not 
using the provided one when recording a value but was using the one of the 
Sensor all the time. This test verifies that the correct config is used both 
for recording and measuring via calling checkQuota. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to