dajac commented on a change in pull request #8977:
URL: https://github.com/apache/kafka/pull/8977#discussion_r449633334
##########
File path: clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
##########
@@ -209,4 +212,139 @@ public void shouldReturnPresenceOfMetrics() {
assertThat(sensor.hasMetrics(), is(true));
}
-}
\ No newline at end of file
+
+ @Test
+ public void testStrictQuotaEnforcement() {
+ final Time time = new MockTime(0, 0, 0);
+ final Metrics metrics = new Metrics(time);
+ final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+ .quota(Quota.upperBound(10))
+ .timeWindow(1, TimeUnit.SECONDS)
+ .samples(11));
+ final MetricName metricName = metrics.metricName("rate", "test-group");
+ assertTrue(sensor.add(metricName, new Rate()));
+ final KafkaMetric rateMetric = metrics.metric(metricName);
+
+ // Recording a first value at T+0s to bring the avg rate to 9. Value
is accepted
+ // because the quota is not exhausted yet.
+ sensor.record(90, time.milliseconds(), QuotaEnforcementType.STRICT);
+ assertEquals(9, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+ // Recording a second value at T+1s to bring the avg rate to 18. Value
is accepted
+ // because the quota is not exhausted yet.
+ time.sleep(1000);
+ sensor.record(90, time.milliseconds(), QuotaEnforcementType.STRICT);
+ assertEquals(18, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+ // Recording a third value at T+2s is rejected immediately and rate is
not updated
+ // because the quota is exhausted.
+ time.sleep(1000);
+ assertThrows(QuotaViolationException.class,
+ () -> sensor.record(90, time.milliseconds(),
QuotaEnforcementType.STRICT));
+ assertEquals(18, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+ metrics.close();
+ }
+
+ @Test
+ public void testPermissiveQuotaEnforcement() {
+ final Time time = new MockTime(0, 0, 0);
+ final Metrics metrics = new Metrics(time);
+ final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+ .quota(Quota.upperBound(10))
+ .timeWindow(1, TimeUnit.SECONDS)
+ .samples(11));
+ final MetricName metricName = metrics.metricName("rate", "test-group");
+ assertTrue(sensor.add(metricName, new Rate()));
+ final KafkaMetric rateMetric = metrics.metric(metricName);
+
+ // Recording a first value at T+0s to bring the avg rate to 9. Value
is accepted
+ // because the quota is not exhausted yet.
+ sensor.record(90, time.milliseconds(),
QuotaEnforcementType.PERMISSIVE);
+ assertEquals(9, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+ // Recording a second value at T+1s to bring the avg rate to 18. Value
is accepted
+ // and rate is updated even though the quota is exhausted.
+ time.sleep(1000);
+ assertThrows(QuotaViolationException.class,
+ () -> sensor.record(90, time.milliseconds(),
QuotaEnforcementType.PERMISSIVE));
+ assertEquals(18, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+ // Recording a second value at T+1s to bring the avg rate to 27. Value
is accepted
+ // and rate is updated even though the quota is exhausted.
+ time.sleep(1000);
+ assertThrows(QuotaViolationException.class,
+ () -> sensor.record(90, time.milliseconds(),
QuotaEnforcementType.PERMISSIVE));
+ assertEquals(27, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+ metrics.close();
+ }
+
+ @Test
+ public void testStrictQuotaEnforcementWithDefaultRate() {
Review comment:
This test illustrate the problem that we are trying to resolve.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]