apoorvmittal10 commented on code in PR #19329: URL: https://github.com/apache/kafka/pull/19329#discussion_r2038534993
########## server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java: ########## @@ -169,21 +176,69 @@ public synchronized boolean tryEvict(long now) { * @param memberId - The member id in the share fetch request. * @param now - The current time in milliseconds. * @param partitionMap - The topic partitions to be added to the session. + * @param clientConnectionId - The client connection id. * @return - The session key if the session was created, or null if the session was not created. */ - public synchronized ShareSessionKey maybeCreateSession(String groupId, Uuid memberId, long now, ImplicitLinkedHashCollection<CachedSharePartition> partitionMap) { + public synchronized ShareSessionKey maybeCreateSession( + String groupId, + Uuid memberId, + long now, + ImplicitLinkedHashCollection<CachedSharePartition> partitionMap, + String clientConnectionId + ) { if (sessions.size() < maxEntries || tryEvict(now)) { ShareSession session = new ShareSession(new ShareSessionKey(groupId, memberId), partitionMap, now, now, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)); sessions.put(session.key(), session); touch(session, now); + sessionClientIdMapping.add(session.key(), clientConnectionId); return session.key(); } return null; } + public ConnectionDisconnectListener connectionDisconnectListener() { + return connectionDisconnectListener; + } + // Visible for testing. Meter evictionsMeter() { return evictionsMeter; } + + private final class ClientConnectionDisconnectListener implements ConnectionDisconnectListener { + + // When the client disconnects, the corresponding session should be removed from the cache. + @Override + public void onDisconnect(String connectionId) { + ShareSessionKey shareSessionKey = sessionClientIdMapping.getShareSessionKey(connectionId); + if (shareSessionKey != null) { + remove(shareSessionKey); + } + } + } + + private static class SessionClientIdMapping { + // A map of session key to client connection id. + private final Map<ShareSessionKey, String> sessionConnectionIdMap = new HashMap<>(); + + // A map of client connection id to session key. + private final Map<String, ShareSessionKey> connectionIdSessionKeyMap = new HashMap<>(); + + private void add(ShareSessionKey key, String connectionId) { + sessionConnectionIdMap.put(key, connectionId); + connectionIdSessionKeyMap.put(connectionId, key); + } + + private ShareSessionKey getShareSessionKey(String connectionId) { + return connectionIdSessionKeyMap.get(connectionId); + } + + private void remove(ShareSessionKey shareSessionKey) { + String connectionId = sessionConnectionIdMap.remove(shareSessionKey); + if (connectionId != null) { + connectionIdSessionKeyMap.remove(connectionId); + } + } + } Review Comment: Why it needs a bidirectional map and can't be done using a single map of `private final Map<String, ShareSessionKey> connectionIdSessionKeyMap = new HashMap<>();`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org