AndrewJSchofield commented on code in PR #19329:
URL: https://github.com/apache/kafka/pull/19329#discussion_r2026942159


##########
core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala:
##########
@@ -885,6 +897,23 @@ class GroupCoordinatorBaseRequestTest(cluster: 
ClusterInstance) {
     )
   }
 
+  protected def connectAndReceiveWithoutClosingSocket[T <: AbstractResponse](
+    request: AbstractRequest,
+    destination: Int
+  )(implicit classTag: ClassTag[T]): T = {
+    val socket = IntegrationTestUtils.connect(brokerSocketServer(destination), 
cluster.clientListener())
+    openSockets += socket
+    IntegrationTestUtils.sendAndReceive[T](request, socket)
+  }
+
+  protected def connectAndReceiveWithoutClosingSocket[T <: AbstractResponse](
+                                                                              
request: AbstractRequest

Review Comment:
   nit: Indentation should match the previous method.



##########
server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java:
##########
@@ -171,19 +178,66 @@ public synchronized boolean tryEvict(long now) {
      * @param partitionMap - The topic partitions to be added to the session.
      * @return - The session key if the session was created, or null if the 
session was not created.

Review Comment:
   Please add `clientConnectionId` to the javadoc comment.



##########
server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java:
##########
@@ -171,19 +178,66 @@ public synchronized boolean tryEvict(long now) {
      * @param partitionMap - The topic partitions to be added to the session.
      * @return - The session key if the session was created, or null if the 
session was not created.
      */
-    public synchronized ShareSessionKey maybeCreateSession(String groupId, 
Uuid memberId, long now, ImplicitLinkedHashCollection<CachedSharePartition> 
partitionMap) {
+    public synchronized ShareSessionKey maybeCreateSession(
+        String groupId,
+        Uuid memberId,
+        long now,
+        ImplicitLinkedHashCollection<CachedSharePartition> partitionMap,
+        String clientConnectionId
+    ) {
         if (sessions.size() < maxEntries || tryEvict(now)) {
             ShareSession session = new ShareSession(new 
ShareSessionKey(groupId, memberId), partitionMap,
                     now, now, 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH));
             sessions.put(session.key(), session);
             touch(session, now);
+            sessionClientIdMapping.add(session.key(), clientConnectionId);
             return session.key();
         }
         return null;
     }
 
+    public ConnectionDisconnectListener connectionDisconnectListener() {
+        return connectionDisconnectListener;
+    }
+
     // Visible for testing.
     Meter evictionsMeter() {
         return evictionsMeter;
     }
+
+    private final class ClientConnectionDisconnectListener implements 
ConnectionDisconnectListener {
+
+        // When the client disconnect, the corresponding session should be 
removed from the cache.

Review Comment:
   nit: "disconnects"



##########
server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java:
##########
@@ -148,6 +148,24 @@ public void testResizeCachedSessions() throws 
InterruptedException {
         assertEquals(0, cache.evictionsMeter().count());
     }
 
+    @Test
+    public void testRemoveConnection() throws InterruptedException {
+        ShareSessionCache cache = new ShareSessionCache(3, 100);
+        assertEquals(0, cache.size());
+        ShareSessionKey key1 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), 0, mockedSharePartitionMap(10), "conn-1");
+        ShareSessionKey key2 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), 10, mockedSharePartitionMap(20), "conn-2");
+        ShareSessionKey key3 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), 20, mockedSharePartitionMap(30), "conn-3");
+
+        // Since cache size is now equal to max entries allowed(3), no new 
session can be created.
+        assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 30, 
mockedSharePartitionMap(40), "conn-4"));
+        assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 40, 
mockedSharePartitionMap(5), "conn-5"));
+        assertShareCacheContains(cache, List.of(key1, key2, key3));
+
+        // Simulating the disconnection of client with connection id conn-1
+        cache.connectionDisconnectListener().onDisconnect("conn-1");
+        assertShareCacheContains(cache, List.of(key2, key3));
+    }

Review Comment:
   And now show that you can add another.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to