AndrewJSchofield commented on code in PR #15234:
URL: https://github.com/apache/kafka/pull/15234#discussion_r1459018939


##########
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##########
@@ -433,4 +459,26 @@ public Map<String, Pattern> matchPattern() {
             return matchPattern;
         }
     }
+
+    private final class ExpirationTimerTask extends TimerTask {
+
+        private final Uuid uuid;
+
+        private ExpirationTimerTask(Uuid uuid, long delayMs) {
+            super(delayMs);
+            this.uuid = uuid;
+        }
+
+        @Override
+        public void run() {
+            log.trace("Expiration timer task run for client instance id: {}, 
after delay ms: {}", uuid, delayMs);
+            if (!clientInstanceCache.remove(uuid)) {
+                // This can only happen if the client instance is removed from 
the cache by the LRU
+                // eviction policy before the expiration timer task is 
executed. Log a warning as broker
+                // cache is not able to hold all the client instances.
+                log.warn("Client metrics instance cache cannot find the client 
instance id: {}. The cache"

Review Comment:
   I wonder whether this is going to create a massive number of log entries in 
the event that the cache is not able to hold all the client instances. Might 
even be worth keeping track of the frequency of this situation.



##########
server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java:
##########
@@ -919,4 +922,101 @@ public void 
testPushTelemetryConcurrentRequestAfterSubscriptionUpdate() throws U
         // 1 request should fail with throttling error.
         assertEquals(1, throttlingErrorCount);
     }
+
+    @Test
+    public void testCacheEviction() throws UnknownHostException, 
InterruptedException {
+        Properties properties = new Properties();
+        properties.put("metrics", 
ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG);
+        properties.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "100");
+        clientMetricsManager.updateSubscription("sub-1", properties);
+
+        GetTelemetrySubscriptionsRequest request = new 
GetTelemetrySubscriptionsRequest.Builder(
+            new GetTelemetrySubscriptionsRequestData(), true).build();
+
+        GetTelemetrySubscriptionsResponse response = 
clientMetricsManager.processGetTelemetrySubscriptionRequest(
+            request, ClientMetricsTestUtils.requestContext());
+        assertEquals(Errors.NONE, response.error());
+
+        
assertNotNull(clientMetricsManager.clientInstance(response.data().clientInstanceId()));
+        assertEquals(1, clientMetricsManager.expirationTimer().size());
+        // Advance the clock to trigger cache eviction, cache expiry should be 
100 * 3 = 300 ms.
+        clientMetricsManager.expirationTimer().advanceClock(300);
+        assertTimeoutPreemptively(Duration.ofMillis(300), () -> {
+            // Validate that cache eviction happens and client instance is 
removed from cache.
+            while (clientMetricsManager.expirationTimer().size() != 0 || 
clientMetricsManager.clientInstance(response.data().clientInstanceId()) != 
null) {

Review Comment:
   nit: Break this super-long line please, just like the equivalent one in the 
next test.



##########
server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java:
##########
@@ -919,4 +922,101 @@ public void 
testPushTelemetryConcurrentRequestAfterSubscriptionUpdate() throws U
         // 1 request should fail with throttling error.
         assertEquals(1, throttlingErrorCount);
     }
+
+    @Test
+    public void testCacheEviction() throws UnknownHostException, 
InterruptedException {
+        Properties properties = new Properties();
+        properties.put("metrics", 
ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG);
+        properties.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "100");
+        clientMetricsManager.updateSubscription("sub-1", properties);
+
+        GetTelemetrySubscriptionsRequest request = new 
GetTelemetrySubscriptionsRequest.Builder(
+            new GetTelemetrySubscriptionsRequestData(), true).build();
+
+        GetTelemetrySubscriptionsResponse response = 
clientMetricsManager.processGetTelemetrySubscriptionRequest(
+            request, ClientMetricsTestUtils.requestContext());
+        assertEquals(Errors.NONE, response.error());
+
+        
assertNotNull(clientMetricsManager.clientInstance(response.data().clientInstanceId()));
+        assertEquals(1, clientMetricsManager.expirationTimer().size());
+        // Advance the clock to trigger cache eviction, cache expiry should be 
100 * 3 = 300 ms.
+        clientMetricsManager.expirationTimer().advanceClock(300);
+        assertTimeoutPreemptively(Duration.ofMillis(300), () -> {

Review Comment:
   300ms does seem rather tight for a reliable test in general. Just checking 
that you are confident that this isn't a flaky test in the making.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to