chirag-wadhwa5 commented on code in PR #19329: URL: https://github.com/apache/kafka/pull/19329#discussion_r2039100729
########## server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java: ########## @@ -169,21 +176,69 @@ public synchronized boolean tryEvict(long now) { * @param memberId - The member id in the share fetch request. * @param now - The current time in milliseconds. * @param partitionMap - The topic partitions to be added to the session. + * @param clientConnectionId - The client connection id. * @return - The session key if the session was created, or null if the session was not created. */ - public synchronized ShareSessionKey maybeCreateSession(String groupId, Uuid memberId, long now, ImplicitLinkedHashCollection<CachedSharePartition> partitionMap) { + public synchronized ShareSessionKey maybeCreateSession( + String groupId, + Uuid memberId, + long now, + ImplicitLinkedHashCollection<CachedSharePartition> partitionMap, + String clientConnectionId + ) { if (sessions.size() < maxEntries || tryEvict(now)) { ShareSession session = new ShareSession(new ShareSessionKey(groupId, memberId), partitionMap, now, now, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)); sessions.put(session.key(), session); touch(session, now); + sessionClientIdMapping.add(session.key(), clientConnectionId); return session.key(); } return null; } + public ConnectionDisconnectListener connectionDisconnectListener() { + return connectionDisconnectListener; + } + // Visible for testing. Meter evictionsMeter() { return evictionsMeter; } + + private final class ClientConnectionDisconnectListener implements ConnectionDisconnectListener { + + // When the client disconnects, the corresponding session should be removed from the cache. + @Override + public void onDisconnect(String connectionId) { + ShareSessionKey shareSessionKey = sessionClientIdMapping.getShareSessionKey(connectionId); + if (shareSessionKey != null) { + remove(shareSessionKey); + } + } + } + + private static class SessionClientIdMapping { + // A map of session key to client connection id. + private final Map<ShareSessionKey, String> sessionConnectionIdMap = new HashMap<>(); + + // A map of client connection id to session key. + private final Map<String, ShareSessionKey> connectionIdSessionKeyMap = new HashMap<>(); + + private void add(ShareSessionKey key, String connectionId) { + sessionConnectionIdMap.put(key, connectionId); + connectionIdSessionKeyMap.put(connectionId, key); + } + + private ShareSessionKey getShareSessionKey(String connectionId) { + return connectionIdSessionKeyMap.get(connectionId); + } + + private void remove(ShareSessionKey shareSessionKey) { + String connectionId = sessionConnectionIdMap.remove(shareSessionKey); + if (connectionId != null) { + connectionIdSessionKeyMap.remove(connectionId); + } + } + } Review Comment: Thanks for the review. In `ClientMetricsManager` class, we only need to access the map from the connectionId, so there it was fine to use a single map. But in the case of `ShareSessionCache`, when the connection drops, we need to access the map using connection ID to remove the entry from map, and when a share session needs to be dropped because of a Final Share Fetch request, then we need to access the map using shareSessionKey. Thus we need a bidirectional mapping between connection ID and ShareSessionKey -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org