apoorvmittal10 commented on code in PR #19329:
URL: https://github.com/apache/kafka/pull/19329#discussion_r2060711375


##########
core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala:
##########
@@ -2052,12 +2052,99 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
     metadata = new ShareRequestMetadata(wrongMemberId, shareSessionEpoch)
     shareFetchRequest = createShareFetchRequest(groupId, metadata, send, 
Seq.empty, Map.empty)
-    shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+    shareFetchResponse = 
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
 
     shareFetchResponseData = shareFetchResponse.data()
     assertEquals(Errors.SHARE_SESSION_NOT_FOUND.code, 
shareFetchResponseData.errorCode)
   }
 
+  @ClusterTests(
+    Array(
+      new ClusterTest(
+        serverProperties = Array(
+          new ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+          new ClusterConfigProperty(key = "group.share.enable", value = 
"true"),
+          new ClusterConfigProperty(key = "offsets.topic.num.partitions", 
value = "1"),
+          new ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1"),
+          new ClusterConfigProperty(key = "group.share.max.share.sessions", 
value="2"),
+          new ClusterConfigProperty(key = "group.share.max.size", value="2")
+        )
+      ),
+      new ClusterTest(
+        serverProperties = Array(
+          new ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+          new ClusterConfigProperty(key = "group.share.enable", value = 
"true"),
+          new ClusterConfigProperty(key = "offsets.topic.num.partitions", 
value = "1"),
+          new ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1"),
+          new ClusterConfigProperty(key = "group.share.persister.class.name", 
value = "org.apache.kafka.server.share.persister.DefaultStatePersister"),
+          new ClusterConfigProperty(key = 
"share.coordinator.state.topic.replication.factor", value = "1"),
+          new ClusterConfigProperty(key = 
"share.coordinator.state.topic.num.partitions", value = "1"),
+          new ClusterConfigProperty(key = "unstable.api.versions.enable", 
value = "true"),
+          new ClusterConfigProperty(key = "group.share.max.share.sessions", 
value="2"),
+          new ClusterConfigProperty(key = "group.share.max.size", value="2")
+        )
+      ),
+    )
+  )
+  def testShareSessionEvictedOnConnectionDrop(): Unit = {
+    val groupId: String = "group"
+    val memberId1 = Uuid.randomUuid()
+    val memberId2 = Uuid.randomUuid()
+    val memberId3 = Uuid.randomUuid()
+
+    val topic = "topic"
+    val partition = 0
+
+    createTopicAndReturnLeaders(topic, numPartitions = 3)
+    val topicIds = getTopicIds.asJava
+    val topicId = topicIds.get(topic)
+    val topicIdPartition = new TopicIdPartition(topicId, new 
TopicPartition(topic, partition))
+
+    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+    // member1 sends share fetch request to register its share session. Note 
it does not close the socket connection after.

Review Comment:
   nit
   ```suggestion
       // member1 sends share fetch request to register it's share session. 
Note: it does not close the socket connection after.
   ```



##########
core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala:
##########
@@ -54,7 +55,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     )
 
     val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, 
Seq.empty, Map.empty)
-    val shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+    val shareFetchResponse = 
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)

Review Comment:
   So the reason for all to be on open socket is that there session will be 
release, but it should not be a problem always, correct? So do you need to 
change every method here?



##########
core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala:
##########
@@ -2052,12 +2052,99 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
     metadata = new ShareRequestMetadata(wrongMemberId, shareSessionEpoch)
     shareFetchRequest = createShareFetchRequest(groupId, metadata, send, 
Seq.empty, Map.empty)
-    shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+    shareFetchResponse = 
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
 
     shareFetchResponseData = shareFetchResponse.data()
     assertEquals(Errors.SHARE_SESSION_NOT_FOUND.code, 
shareFetchResponseData.errorCode)
   }
 
+  @ClusterTests(
+    Array(
+      new ClusterTest(
+        serverProperties = Array(
+          new ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+          new ClusterConfigProperty(key = "group.share.enable", value = 
"true"),
+          new ClusterConfigProperty(key = "offsets.topic.num.partitions", 
value = "1"),
+          new ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1"),
+          new ClusterConfigProperty(key = "group.share.max.share.sessions", 
value="2"),
+          new ClusterConfigProperty(key = "group.share.max.size", value="2")
+        )
+      ),
+      new ClusterTest(
+        serverProperties = Array(
+          new ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+          new ClusterConfigProperty(key = "group.share.enable", value = 
"true"),
+          new ClusterConfigProperty(key = "offsets.topic.num.partitions", 
value = "1"),
+          new ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1"),
+          new ClusterConfigProperty(key = "group.share.persister.class.name", 
value = "org.apache.kafka.server.share.persister.DefaultStatePersister"),
+          new ClusterConfigProperty(key = 
"share.coordinator.state.topic.replication.factor", value = "1"),
+          new ClusterConfigProperty(key = 
"share.coordinator.state.topic.num.partitions", value = "1"),
+          new ClusterConfigProperty(key = "unstable.api.versions.enable", 
value = "true"),
+          new ClusterConfigProperty(key = "group.share.max.share.sessions", 
value="2"),
+          new ClusterConfigProperty(key = "group.share.max.size", value="2")
+        )
+      ),
+    )
+  )
+  def testShareSessionEvictedOnConnectionDrop(): Unit = {
+    val groupId: String = "group"
+    val memberId1 = Uuid.randomUuid()
+    val memberId2 = Uuid.randomUuid()
+    val memberId3 = Uuid.randomUuid()
+
+    val topic = "topic"
+    val partition = 0
+
+    createTopicAndReturnLeaders(topic, numPartitions = 3)
+    val topicIds = getTopicIds.asJava
+    val topicId = topicIds.get(topic)
+    val topicIdPartition = new TopicIdPartition(topicId, new 
TopicPartition(topic, partition))
+
+    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+    // member1 sends share fetch request to register its share session. Note 
it does not close the socket connection after.
+    TestUtils.waitUntilTrue(() => {
+      val metadata = new ShareRequestMetadata(memberId1, 
ShareRequestMetadata.INITIAL_EPOCH)
+      val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, 
Seq.empty, Map.empty)
+      val shareFetchResponse = 
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
+      val shareFetchResponseData = shareFetchResponse.data()
+      shareFetchResponseData.errorCode == Errors.NONE.code
+    }, "Share fetch request failed", 5000)
+
+    // member2 sends share fetch request to register its share session. Note 
it does not close the socket connection after.
+    TestUtils.waitUntilTrue(() => {
+      val metadata = new ShareRequestMetadata(memberId2, 
ShareRequestMetadata.INITIAL_EPOCH)
+      val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, 
Seq.empty, Map.empty)
+      val shareFetchResponse = 
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
+      val shareFetchResponseData = shareFetchResponse.data()
+      shareFetchResponseData.errorCode == Errors.NONE.code
+    }, "Share fetch request failed", 5000)
+
+    // member3 sends share fetch request to register its share session. Since 
the maximum number of share sessions that could
+    // exist in the share session cache is 2 (group.share.max.groups * 
group.share.max.size), the attempt to register a third

Review Comment:
   `(group.share.max.groups * group.share.max.size)` this seems incorrect now.



##########
server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java:
##########
@@ -53,17 +54,22 @@ public class ShareSessionCache {
 
     private final int maxEntries;
     private long numPartitions = 0;
+    private final ConnectionDisconnectListener connectionDisconnectListener;
 
     // A map of session key to ShareSession.
     private final Map<ShareSessionKey, ShareSession> sessions = new 
HashMap<>();
 
+    private final Map<String, ShareSessionKey> connectionIdToSessionMapping;

Review Comment:
   ```suggestion
       private final Map<String, ShareSessionKey> connectionIdToSessionMap;
   ```



##########
core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala:
##########
@@ -592,6 +592,81 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
     }
   }
 
+  @ClusterTest(
+    types = Array(Type.KRAFT),
+    serverProperties = Array(
+      new ClusterConfigProperty(key = "group.share.enable", value = "true"),
+      new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+      new ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1"),
+      new ClusterConfigProperty(key = "group.share.max.size", value = "2")
+    ))
+  def testShareGroupMaxSizeConfigExceeded(): Unit = {
+    val groupId: String = "group"
+    val memberId1 = Uuid.randomUuid()
+    val memberId2 = Uuid.randomUuid()
+    val memberId3 = Uuid.randomUuid()
+
+    val admin = cluster.admin()

Review Comment:
   Admin implements AutoCloseable. You can use try-with-resources and then 
finally won't be required.



##########
core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala:
##########
@@ -2052,12 +2052,99 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
     metadata = new ShareRequestMetadata(wrongMemberId, shareSessionEpoch)
     shareFetchRequest = createShareFetchRequest(groupId, metadata, send, 
Seq.empty, Map.empty)
-    shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+    shareFetchResponse = 
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
 
     shareFetchResponseData = shareFetchResponse.data()
     assertEquals(Errors.SHARE_SESSION_NOT_FOUND.code, 
shareFetchResponseData.errorCode)
   }
 
+  @ClusterTests(
+    Array(
+      new ClusterTest(
+        serverProperties = Array(
+          new ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+          new ClusterConfigProperty(key = "group.share.enable", value = 
"true"),
+          new ClusterConfigProperty(key = "offsets.topic.num.partitions", 
value = "1"),
+          new ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1"),
+          new ClusterConfigProperty(key = "group.share.max.share.sessions", 
value="2"),
+          new ClusterConfigProperty(key = "group.share.max.size", value="2")
+        )
+      ),
+      new ClusterTest(
+        serverProperties = Array(
+          new ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+          new ClusterConfigProperty(key = "group.share.enable", value = 
"true"),
+          new ClusterConfigProperty(key = "offsets.topic.num.partitions", 
value = "1"),
+          new ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1"),
+          new ClusterConfigProperty(key = "group.share.persister.class.name", 
value = "org.apache.kafka.server.share.persister.DefaultStatePersister"),
+          new ClusterConfigProperty(key = 
"share.coordinator.state.topic.replication.factor", value = "1"),
+          new ClusterConfigProperty(key = 
"share.coordinator.state.topic.num.partitions", value = "1"),
+          new ClusterConfigProperty(key = "unstable.api.versions.enable", 
value = "true"),
+          new ClusterConfigProperty(key = "group.share.max.share.sessions", 
value="2"),
+          new ClusterConfigProperty(key = "group.share.max.size", value="2")
+        )
+      ),
+    )
+  )
+  def testShareSessionEvictedOnConnectionDrop(): Unit = {
+    val groupId: String = "group"
+    val memberId1 = Uuid.randomUuid()
+    val memberId2 = Uuid.randomUuid()
+    val memberId3 = Uuid.randomUuid()
+
+    val topic = "topic"
+    val partition = 0
+
+    createTopicAndReturnLeaders(topic, numPartitions = 3)
+    val topicIds = getTopicIds.asJava
+    val topicId = topicIds.get(topic)
+    val topicIdPartition = new TopicIdPartition(topicId, new 
TopicPartition(topic, partition))
+
+    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+    // member1 sends share fetch request to register its share session. Note 
it does not close the socket connection after.

Review Comment:
   Similarly at other places.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to