junrao commented on code in PR #17474:
URL: https://github.com/apache/kafka/pull/17474#discussion_r1800244484


##########
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##########
@@ -114,13 +117,15 @@ public ClientMetricsManager(ClientMetricsReceiverPlugin 
receiverPlugin, int clie
         this.subscriptionMap = new ConcurrentHashMap<>();
         this.subscriptionUpdateVersion = new AtomicInteger(0);
         this.clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CACHE_MAX_SIZE));
+        this.clientConnectionIdMap = new ConcurrentHashMap<>();
         this.expirationTimer = new 
SystemTimerReaper(CLIENT_METRICS_REAPER_THREAD_NAME, new 
SystemTimer("client-metrics"));
         this.clientTelemetryMaxBytes = clientTelemetryMaxBytes;
         this.time = time;
         this.cacheExpiryMs = cacheExpiryMs;
         this.lastCacheErrorLogMs = new AtomicLong(0);
         this.metrics = metrics;
         this.clientMetricsStats = new ClientMetricsStats();
+        this.connectionDisconnectListener = new 
ClientConnectionDisconnectListener(clientInstanceCache, clientConnectionIdMap, 
clientMetricsStats);

Review Comment:
   ClientConnectionDisconnectListener is a private class and can use all 
members in the parent class. Do we need to pass in clientInstanceCache, 
clientConnectionIdMap, clientMetricsStats?



##########
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##########
@@ -459,6 +469,48 @@ Timer expirationTimer() {
         return expirationTimer;
     }
 
+    // Visible for testing
+    Map<String, Uuid> clientConnectionIdMap() {
+        return clientConnectionIdMap;
+    }
+
+    private final class ClientConnectionDisconnectListener implements 
ConnectionDisconnectListener {
+
+        private final Cache<Uuid, ClientMetricsInstance> clientInstanceCache;
+        private final Map<String, Uuid> clientConnectionIdMap;
+        private final ClientMetricsStats clientMetricsStats;
+
+        ClientConnectionDisconnectListener(
+            Cache<Uuid, ClientMetricsInstance> clientInstanceCache,
+            Map<String, Uuid> clientConnectionIdMap,
+            ClientMetricsStats clientMetricsStats
+        ) {
+            this.clientInstanceCache = clientInstanceCache;
+            this.clientConnectionIdMap = clientConnectionIdMap;
+            this.clientMetricsStats = clientMetricsStats;
+        }
+
+        @Override
+        public void onDisconnect(String connectionId) {
+            log.trace("Removing client connection id [{}] from the client 
instance cache", connectionId);
+
+            Uuid clientInstanceId = clientConnectionIdMap.remove(connectionId);
+            if (clientInstanceId == null) {
+              log.trace("Client connection id [{}] is not found in the client 
instance cache", connectionId);
+              return;
+            }
+
+            // Unregister the client instance metrics from the broker metrics.
+            
clientMetricsStats.unregisterClientInstanceMetrics(clientInstanceId);
+
+            ClientMetricsInstance clientInstance = 
clientInstanceCache.get(clientInstanceId);
+            if (clientInstance != null) {
+                clientInstance.cancelExpirationTimerTask();
+                clientInstanceCache.remove(clientInstanceId);

Review Comment:
   This means that every time we restart a broker, we could prematurely remove 
a live clientInstanceId. Do we gain more than we lose?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to