chirag-wadhwa5 commented on code in PR #19329:
URL: https://github.com/apache/kafka/pull/19329#discussion_r2039100729


##########
server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java:
##########
@@ -169,21 +176,69 @@ public synchronized boolean tryEvict(long now) {
      * @param memberId - The member id in the share fetch request.
      * @param now - The current time in milliseconds.
      * @param partitionMap - The topic partitions to be added to the session.
+     * @param clientConnectionId - The client connection id.
      * @return - The session key if the session was created, or null if the 
session was not created.
      */
-    public synchronized ShareSessionKey maybeCreateSession(String groupId, 
Uuid memberId, long now, ImplicitLinkedHashCollection<CachedSharePartition> 
partitionMap) {
+    public synchronized ShareSessionKey maybeCreateSession(
+        String groupId,
+        Uuid memberId,
+        long now,
+        ImplicitLinkedHashCollection<CachedSharePartition> partitionMap,
+        String clientConnectionId
+    ) {
         if (sessions.size() < maxEntries || tryEvict(now)) {
             ShareSession session = new ShareSession(new 
ShareSessionKey(groupId, memberId), partitionMap,
                     now, now, 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH));
             sessions.put(session.key(), session);
             touch(session, now);
+            sessionClientIdMapping.add(session.key(), clientConnectionId);
             return session.key();
         }
         return null;
     }
 
+    public ConnectionDisconnectListener connectionDisconnectListener() {
+        return connectionDisconnectListener;
+    }
+
     // Visible for testing.
     Meter evictionsMeter() {
         return evictionsMeter;
     }
+
+    private final class ClientConnectionDisconnectListener implements 
ConnectionDisconnectListener {
+
+        // When the client disconnects, the corresponding session should be 
removed from the cache.
+        @Override
+        public void onDisconnect(String connectionId) {
+            ShareSessionKey shareSessionKey = 
sessionClientIdMapping.getShareSessionKey(connectionId);
+            if (shareSessionKey != null) {
+                remove(shareSessionKey);
+            }
+        }
+    }
+
+    private static class SessionClientIdMapping {
+        // A map of session key to client connection id.
+        private final Map<ShareSessionKey, String> sessionConnectionIdMap = 
new HashMap<>();
+
+        // A map of client connection id to session key.
+        private final Map<String, ShareSessionKey> connectionIdSessionKeyMap = 
new HashMap<>();
+
+        private void add(ShareSessionKey key, String connectionId) {
+            sessionConnectionIdMap.put(key, connectionId);
+            connectionIdSessionKeyMap.put(connectionId, key);
+        }
+
+        private ShareSessionKey getShareSessionKey(String connectionId) {
+            return connectionIdSessionKeyMap.get(connectionId);
+        }
+
+        private void remove(ShareSessionKey shareSessionKey) {
+            String connectionId = 
sessionConnectionIdMap.remove(shareSessionKey);
+            if (connectionId != null) {
+                connectionIdSessionKeyMap.remove(connectionId);
+            }
+        }
+    }

Review Comment:
   Thanks for the review. In `ClientMetricsManager` class, we only need to 
access the map from the connectionId, so there it was fine to use a single map. 
But in the case of `ShareSessionCache`, when the connection drops, we need to 
access the map using connection ID to remove the entry from map, and when a 
share session needs to be dropped because of a Final Share Fetch request, then 
we need to access the map using shareSessionKey. Thus we need a bidirectional 
mapping between connection ID and ShareSessionKey



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to