apoorvmittal10 commented on code in PR #19329: URL: https://github.com/apache/kafka/pull/19329#discussion_r2060711375
########## core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala: ########## @@ -2052,12 +2052,99 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) metadata = new ShareRequestMetadata(wrongMemberId, shareSessionEpoch) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.SHARE_SESSION_NOT_FOUND.code, shareFetchResponseData.errorCode) } + @ClusterTests( + Array( + new ClusterTest( + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,share"), + new ClusterConfigProperty(key = "group.share.enable", value = "true"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), + new ClusterConfigProperty(key = "group.share.max.share.sessions", value="2"), + new ClusterConfigProperty(key = "group.share.max.size", value="2") + ) + ), + new ClusterTest( + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,share"), + new ClusterConfigProperty(key = "group.share.enable", value = "true"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), + new ClusterConfigProperty(key = "group.share.persister.class.name", value = "org.apache.kafka.server.share.persister.DefaultStatePersister"), + new ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "1"), + new ClusterConfigProperty(key = "share.coordinator.state.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.share.max.share.sessions", value="2"), + new ClusterConfigProperty(key = "group.share.max.size", value="2") + ) + ), + ) + ) + def testShareSessionEvictedOnConnectionDrop(): Unit = { + val groupId: String = "group" + val memberId1 = Uuid.randomUuid() + val memberId2 = Uuid.randomUuid() + val memberId3 = Uuid.randomUuid() + + val topic = "topic" + val partition = 0 + + createTopicAndReturnLeaders(topic, numPartitions = 3) + val topicIds = getTopicIds.asJava + val topicId = topicIds.get(topic) + val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition)) + + val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + + // member1 sends share fetch request to register its share session. Note it does not close the socket connection after. Review Comment: nit ```suggestion // member1 sends share fetch request to register it's share session. Note: it does not close the socket connection after. ``` ########## core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala: ########## @@ -54,7 +55,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo ) val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) + val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) Review Comment: So the reason for all to be on open socket is that there session will be release, but it should not be a problem always, correct? So do you need to change every method here? ########## core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala: ########## @@ -2052,12 +2052,99 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) metadata = new ShareRequestMetadata(wrongMemberId, shareSessionEpoch) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.SHARE_SESSION_NOT_FOUND.code, shareFetchResponseData.errorCode) } + @ClusterTests( + Array( + new ClusterTest( + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,share"), + new ClusterConfigProperty(key = "group.share.enable", value = "true"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), + new ClusterConfigProperty(key = "group.share.max.share.sessions", value="2"), + new ClusterConfigProperty(key = "group.share.max.size", value="2") + ) + ), + new ClusterTest( + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,share"), + new ClusterConfigProperty(key = "group.share.enable", value = "true"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), + new ClusterConfigProperty(key = "group.share.persister.class.name", value = "org.apache.kafka.server.share.persister.DefaultStatePersister"), + new ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "1"), + new ClusterConfigProperty(key = "share.coordinator.state.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.share.max.share.sessions", value="2"), + new ClusterConfigProperty(key = "group.share.max.size", value="2") + ) + ), + ) + ) + def testShareSessionEvictedOnConnectionDrop(): Unit = { + val groupId: String = "group" + val memberId1 = Uuid.randomUuid() + val memberId2 = Uuid.randomUuid() + val memberId3 = Uuid.randomUuid() + + val topic = "topic" + val partition = 0 + + createTopicAndReturnLeaders(topic, numPartitions = 3) + val topicIds = getTopicIds.asJava + val topicId = topicIds.get(topic) + val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition)) + + val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + + // member1 sends share fetch request to register its share session. Note it does not close the socket connection after. + TestUtils.waitUntilTrue(() => { + val metadata = new ShareRequestMetadata(memberId1, ShareRequestMetadata.INITIAL_EPOCH) + val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) + val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + val shareFetchResponseData = shareFetchResponse.data() + shareFetchResponseData.errorCode == Errors.NONE.code + }, "Share fetch request failed", 5000) + + // member2 sends share fetch request to register its share session. Note it does not close the socket connection after. + TestUtils.waitUntilTrue(() => { + val metadata = new ShareRequestMetadata(memberId2, ShareRequestMetadata.INITIAL_EPOCH) + val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) + val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + val shareFetchResponseData = shareFetchResponse.data() + shareFetchResponseData.errorCode == Errors.NONE.code + }, "Share fetch request failed", 5000) + + // member3 sends share fetch request to register its share session. Since the maximum number of share sessions that could + // exist in the share session cache is 2 (group.share.max.groups * group.share.max.size), the attempt to register a third Review Comment: `(group.share.max.groups * group.share.max.size)` this seems incorrect now. ########## server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java: ########## @@ -53,17 +54,22 @@ public class ShareSessionCache { private final int maxEntries; private long numPartitions = 0; + private final ConnectionDisconnectListener connectionDisconnectListener; // A map of session key to ShareSession. private final Map<ShareSessionKey, ShareSession> sessions = new HashMap<>(); + private final Map<String, ShareSessionKey> connectionIdToSessionMapping; Review Comment: ```suggestion private final Map<String, ShareSessionKey> connectionIdToSessionMap; ``` ########## core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala: ########## @@ -592,6 +592,81 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { } } + @ClusterTest( + types = Array(Type.KRAFT), + serverProperties = Array( + new ClusterConfigProperty(key = "group.share.enable", value = "true"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), + new ClusterConfigProperty(key = "group.share.max.size", value = "2") + )) + def testShareGroupMaxSizeConfigExceeded(): Unit = { + val groupId: String = "group" + val memberId1 = Uuid.randomUuid() + val memberId2 = Uuid.randomUuid() + val memberId3 = Uuid.randomUuid() + + val admin = cluster.admin() Review Comment: Admin implements AutoCloseable. You can use try-with-resources and then finally won't be required. ########## core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala: ########## @@ -2052,12 +2052,99 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) metadata = new ShareRequestMetadata(wrongMemberId, shareSessionEpoch) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.SHARE_SESSION_NOT_FOUND.code, shareFetchResponseData.errorCode) } + @ClusterTests( + Array( + new ClusterTest( + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,share"), + new ClusterConfigProperty(key = "group.share.enable", value = "true"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), + new ClusterConfigProperty(key = "group.share.max.share.sessions", value="2"), + new ClusterConfigProperty(key = "group.share.max.size", value="2") + ) + ), + new ClusterTest( + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,share"), + new ClusterConfigProperty(key = "group.share.enable", value = "true"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), + new ClusterConfigProperty(key = "group.share.persister.class.name", value = "org.apache.kafka.server.share.persister.DefaultStatePersister"), + new ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "1"), + new ClusterConfigProperty(key = "share.coordinator.state.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.share.max.share.sessions", value="2"), + new ClusterConfigProperty(key = "group.share.max.size", value="2") + ) + ), + ) + ) + def testShareSessionEvictedOnConnectionDrop(): Unit = { + val groupId: String = "group" + val memberId1 = Uuid.randomUuid() + val memberId2 = Uuid.randomUuid() + val memberId3 = Uuid.randomUuid() + + val topic = "topic" + val partition = 0 + + createTopicAndReturnLeaders(topic, numPartitions = 3) + val topicIds = getTopicIds.asJava + val topicId = topicIds.get(topic) + val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition)) + + val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + + // member1 sends share fetch request to register its share session. Note it does not close the socket connection after. Review Comment: Similarly at other places. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org