AndrewJSchofield commented on code in PR #19329: URL: https://github.com/apache/kafka/pull/19329#discussion_r2026942159
########## core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala: ########## @@ -885,6 +897,23 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { ) } + protected def connectAndReceiveWithoutClosingSocket[T <: AbstractResponse]( + request: AbstractRequest, + destination: Int + )(implicit classTag: ClassTag[T]): T = { + val socket = IntegrationTestUtils.connect(brokerSocketServer(destination), cluster.clientListener()) + openSockets += socket + IntegrationTestUtils.sendAndReceive[T](request, socket) + } + + protected def connectAndReceiveWithoutClosingSocket[T <: AbstractResponse]( + request: AbstractRequest Review Comment: nit: Indentation should match the previous method. ########## server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java: ########## @@ -171,19 +178,66 @@ public synchronized boolean tryEvict(long now) { * @param partitionMap - The topic partitions to be added to the session. * @return - The session key if the session was created, or null if the session was not created. Review Comment: Please add `clientConnectionId` to the javadoc comment. ########## server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java: ########## @@ -171,19 +178,66 @@ public synchronized boolean tryEvict(long now) { * @param partitionMap - The topic partitions to be added to the session. * @return - The session key if the session was created, or null if the session was not created. */ - public synchronized ShareSessionKey maybeCreateSession(String groupId, Uuid memberId, long now, ImplicitLinkedHashCollection<CachedSharePartition> partitionMap) { + public synchronized ShareSessionKey maybeCreateSession( + String groupId, + Uuid memberId, + long now, + ImplicitLinkedHashCollection<CachedSharePartition> partitionMap, + String clientConnectionId + ) { if (sessions.size() < maxEntries || tryEvict(now)) { ShareSession session = new ShareSession(new ShareSessionKey(groupId, memberId), partitionMap, now, now, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)); sessions.put(session.key(), session); touch(session, now); + sessionClientIdMapping.add(session.key(), clientConnectionId); return session.key(); } return null; } + public ConnectionDisconnectListener connectionDisconnectListener() { + return connectionDisconnectListener; + } + // Visible for testing. Meter evictionsMeter() { return evictionsMeter; } + + private final class ClientConnectionDisconnectListener implements ConnectionDisconnectListener { + + // When the client disconnect, the corresponding session should be removed from the cache. Review Comment: nit: "disconnects" ########## server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java: ########## @@ -148,6 +148,24 @@ public void testResizeCachedSessions() throws InterruptedException { assertEquals(0, cache.evictionsMeter().count()); } + @Test + public void testRemoveConnection() throws InterruptedException { + ShareSessionCache cache = new ShareSessionCache(3, 100); + assertEquals(0, cache.size()); + ShareSessionKey key1 = cache.maybeCreateSession("grp", Uuid.randomUuid(), 0, mockedSharePartitionMap(10), "conn-1"); + ShareSessionKey key2 = cache.maybeCreateSession("grp", Uuid.randomUuid(), 10, mockedSharePartitionMap(20), "conn-2"); + ShareSessionKey key3 = cache.maybeCreateSession("grp", Uuid.randomUuid(), 20, mockedSharePartitionMap(30), "conn-3"); + + // Since cache size is now equal to max entries allowed(3), no new session can be created. + assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 30, mockedSharePartitionMap(40), "conn-4")); + assertNull(cache.maybeCreateSession("grp", Uuid.randomUuid(), 40, mockedSharePartitionMap(5), "conn-5")); + assertShareCacheContains(cache, List.of(key1, key2, key3)); + + // Simulating the disconnection of client with connection id conn-1 + cache.connectionDisconnectListener().onDisconnect("conn-1"); + assertShareCacheContains(cache, List.of(key2, key3)); + } Review Comment: And now show that you can add another. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org