junrao commented on code in PR #17474: URL: https://github.com/apache/kafka/pull/17474#discussion_r1800244484
########## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ########## @@ -114,13 +117,15 @@ public ClientMetricsManager(ClientMetricsReceiverPlugin receiverPlugin, int clie this.subscriptionMap = new ConcurrentHashMap<>(); this.subscriptionUpdateVersion = new AtomicInteger(0); this.clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CACHE_MAX_SIZE)); + this.clientConnectionIdMap = new ConcurrentHashMap<>(); this.expirationTimer = new SystemTimerReaper(CLIENT_METRICS_REAPER_THREAD_NAME, new SystemTimer("client-metrics")); this.clientTelemetryMaxBytes = clientTelemetryMaxBytes; this.time = time; this.cacheExpiryMs = cacheExpiryMs; this.lastCacheErrorLogMs = new AtomicLong(0); this.metrics = metrics; this.clientMetricsStats = new ClientMetricsStats(); + this.connectionDisconnectListener = new ClientConnectionDisconnectListener(clientInstanceCache, clientConnectionIdMap, clientMetricsStats); Review Comment: ClientConnectionDisconnectListener is a private class and can use all members in the parent class. Do we need to pass in clientInstanceCache, clientConnectionIdMap, clientMetricsStats? ########## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ########## @@ -459,6 +469,48 @@ Timer expirationTimer() { return expirationTimer; } + // Visible for testing + Map<String, Uuid> clientConnectionIdMap() { + return clientConnectionIdMap; + } + + private final class ClientConnectionDisconnectListener implements ConnectionDisconnectListener { + + private final Cache<Uuid, ClientMetricsInstance> clientInstanceCache; + private final Map<String, Uuid> clientConnectionIdMap; + private final ClientMetricsStats clientMetricsStats; + + ClientConnectionDisconnectListener( + Cache<Uuid, ClientMetricsInstance> clientInstanceCache, + Map<String, Uuid> clientConnectionIdMap, + ClientMetricsStats clientMetricsStats + ) { + this.clientInstanceCache = clientInstanceCache; + this.clientConnectionIdMap = clientConnectionIdMap; + this.clientMetricsStats = clientMetricsStats; + } + + @Override + public void onDisconnect(String connectionId) { + log.trace("Removing client connection id [{}] from the client instance cache", connectionId); + + Uuid clientInstanceId = clientConnectionIdMap.remove(connectionId); + if (clientInstanceId == null) { + log.trace("Client connection id [{}] is not found in the client instance cache", connectionId); + return; + } + + // Unregister the client instance metrics from the broker metrics. + clientMetricsStats.unregisterClientInstanceMetrics(clientInstanceId); + + ClientMetricsInstance clientInstance = clientInstanceCache.get(clientInstanceId); + if (clientInstance != null) { + clientInstance.cancelExpirationTimerTask(); + clientInstanceCache.remove(clientInstanceId); Review Comment: This means that every time we restart a broker, we could prematurely remove a live clientInstanceId. Do we gain more than we lose? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org