AndrewJSchofield commented on code in PR #19329: URL: https://github.com/apache/kafka/pull/19329#discussion_r2063614759
########## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java: ########## @@ -2065,6 +2066,67 @@ public void testComplexShareConsumer() throws Exception { verifyShareGroupStateTopicRecordsProduced(); } + @ClusterTest( + brokers = 1, + serverProperties = { + @ClusterConfigProperty(key = "auto.create.topics.enable", value = "false"), + @ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,share"), + @ClusterConfigProperty(key = "group.share.enable", value = "true"), + @ClusterConfigProperty(key = "group.share.partition.max.record.locks", value = "10000"), + @ClusterConfigProperty(key = "group.share.record.lock.duration.ms", value = "15000"), + @ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), + @ClusterConfigProperty(key = "share.coordinator.state.topic.min.isr", value = "1"), + @ClusterConfigProperty(key = "share.coordinator.state.topic.num.partitions", value = "3"), + @ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "1"), + @ClusterConfigProperty(key = "transaction.state.log.min.isr", value = "1"), + @ClusterConfigProperty(key = "transaction.state.log.replication.factor", value = "1"), + @ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + @ClusterConfigProperty(key = "group.share.max.size", value = "3") // Setting max group size to 3 + } + ) + public void testShareGroupMaxSizeConfigExceeded() throws Exception { + alterShareAutoOffsetReset("group1", "earliest"); + // creating 3 consumers in the group1 + ShareConsumer<byte[], byte[]> shareConsumer1 = createShareConsumer("group1"); + ShareConsumer<byte[], byte[]> shareConsumer2 = createShareConsumer("group1"); + ShareConsumer<byte[], byte[]> shareConsumer3 = createShareConsumer("group1"); + + shareConsumer1.subscribe(Set.of(tp.topic())); + shareConsumer2.subscribe(Set.of(tp.topic())); + shareConsumer3.subscribe(Set.of(tp.topic())); + + produceMessages(1); Review Comment: What's the point of producing the records here? If you're just checking the max group size, subscribing and polling is enough. And then you don't need to alter the auto-offset reset config because you wouldn't be consuming, which would make the test run much more quickly. ########## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java: ########## @@ -2065,6 +2066,67 @@ public void testComplexShareConsumer() throws Exception { verifyShareGroupStateTopicRecordsProduced(); } + @ClusterTest( + brokers = 1, + serverProperties = { + @ClusterConfigProperty(key = "auto.create.topics.enable", value = "false"), + @ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,share"), + @ClusterConfigProperty(key = "group.share.enable", value = "true"), + @ClusterConfigProperty(key = "group.share.partition.max.record.locks", value = "10000"), + @ClusterConfigProperty(key = "group.share.record.lock.duration.ms", value = "15000"), + @ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), + @ClusterConfigProperty(key = "share.coordinator.state.topic.min.isr", value = "1"), + @ClusterConfigProperty(key = "share.coordinator.state.topic.num.partitions", value = "3"), + @ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "1"), + @ClusterConfigProperty(key = "transaction.state.log.min.isr", value = "1"), + @ClusterConfigProperty(key = "transaction.state.log.replication.factor", value = "1"), + @ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), Review Comment: The APIs are now stable. No need for this config now. ########## core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala: ########## @@ -2052,12 +2052,99 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) metadata = new ShareRequestMetadata(wrongMemberId, shareSessionEpoch) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.SHARE_SESSION_NOT_FOUND.code, shareFetchResponseData.errorCode) } + @ClusterTests( + Array( + new ClusterTest( + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,share"), + new ClusterConfigProperty(key = "group.share.enable", value = "true"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), + new ClusterConfigProperty(key = "group.share.max.share.sessions", value="2"), + new ClusterConfigProperty(key = "group.share.max.size", value="2") + ) + ), + new ClusterTest( + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,share"), + new ClusterConfigProperty(key = "group.share.enable", value = "true"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), + new ClusterConfigProperty(key = "group.share.persister.class.name", value = "org.apache.kafka.server.share.persister.DefaultStatePersister"), + new ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "1"), + new ClusterConfigProperty(key = "share.coordinator.state.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.share.max.share.sessions", value="2"), + new ClusterConfigProperty(key = "group.share.max.size", value="2") + ) + ), + ) + ) + def testShareSessionEvictedOnConnectionDrop(): Unit = { + val groupId: String = "group" + val memberId1 = Uuid.randomUuid() + val memberId2 = Uuid.randomUuid() + val memberId3 = Uuid.randomUuid() + + val topic = "topic" + val partition = 0 + + createTopicAndReturnLeaders(topic, numPartitions = 3) + val topicIds = getTopicIds.asJava + val topicId = topicIds.get(topic) + val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition)) + + val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + + // member1 sends share fetch request to register it's share session. Note it does not close the socket connection after. + TestUtils.waitUntilTrue(() => { + val metadata = new ShareRequestMetadata(memberId1, ShareRequestMetadata.INITIAL_EPOCH) + val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) + val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + val shareFetchResponseData = shareFetchResponse.data() + shareFetchResponseData.errorCode == Errors.NONE.code + }, "Share fetch request failed", 5000) + + // member2 sends share fetch request to register it's share session. Note it does not close the socket connection after. + TestUtils.waitUntilTrue(() => { + val metadata = new ShareRequestMetadata(memberId2, ShareRequestMetadata.INITIAL_EPOCH) + val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) + val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + val shareFetchResponseData = shareFetchResponse.data() + shareFetchResponseData.errorCode == Errors.NONE.code + }, "Share fetch request failed", 5000) + + // member3 sends share fetch request to register it's share session. Since the maximum number of share sessions that could + // exist in the share session cache is 2 (group.share.max.share.sessions), the attempt to register a third + // share session with the ShareSessionCache would throw SHARE_SESSION_NOT_FOUND Review Comment: This is actually `SHARE_SESSION_LIMIT_REACHED` according to the KIP. ########## core/src/test/java/kafka/server/share/SharePartitionManagerTest.java: ########## @@ -237,7 +238,7 @@ public void testNewContextReturnsFinalContextWithRequestData() { // shareFetch is not empty, but the maxBytes of topic partition is 0, which means this is added only for acknowledgements. Review Comment: Haven't we removed partitionMaxBytes? ########## core/src/main/java/kafka/server/share/SharePartitionManager.java: ########## @@ -423,8 +423,14 @@ private CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.Par * @param isAcknowledgeDataPresent This tells whether the fetch request received includes piggybacked acknowledgements or not Review Comment: nit: Missing parameter for `clientConnectionId` in javadoc comment. ########## core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala: ########## @@ -2052,12 +2052,99 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) metadata = new ShareRequestMetadata(wrongMemberId, shareSessionEpoch) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.SHARE_SESSION_NOT_FOUND.code, shareFetchResponseData.errorCode) } + @ClusterTests( + Array( + new ClusterTest( + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,share"), + new ClusterConfigProperty(key = "group.share.enable", value = "true"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), + new ClusterConfigProperty(key = "group.share.max.share.sessions", value="2"), + new ClusterConfigProperty(key = "group.share.max.size", value="2") + ) + ), + new ClusterTest( + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,share"), + new ClusterConfigProperty(key = "group.share.enable", value = "true"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), + new ClusterConfigProperty(key = "group.share.persister.class.name", value = "org.apache.kafka.server.share.persister.DefaultStatePersister"), + new ClusterConfigProperty(key = "share.coordinator.state.topic.replication.factor", value = "1"), + new ClusterConfigProperty(key = "share.coordinator.state.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), Review Comment: The APIs are stable now. ########## core/src/test/java/kafka/server/share/SharePartitionManagerTest.java: ########## @@ -269,7 +270,7 @@ public void testNewContextReturnsFinalContextWhenTopicPartitionsArePresentInRequ // shareFetch is not empty, and it contains tpId1, which should return FinalContext instance since it is FINAL_EPOCH List<TopicIdPartition> reqData3 = List.of(new TopicIdPartition(tpId1, new TopicPartition("foo", 0))); Review Comment: Again, `reqData2` I think. ########## core/src/test/java/kafka/server/share/SharePartitionManagerTest.java: ########## @@ -237,7 +238,7 @@ public void testNewContextReturnsFinalContextWithRequestData() { // shareFetch is not empty, but the maxBytes of topic partition is 0, which means this is added only for acknowledgements. // New context should be created successfully List<TopicIdPartition> reqData3 = List.of(new TopicIdPartition(tpId1, new TopicPartition("foo", 0))); Review Comment: This would be more sensibly named as `reqData2` I think. ########## core/src/test/java/kafka/server/share/SharePartitionManagerTest.java: ########## @@ -314,16 +315,16 @@ public void testNewContext() { // Test trying to create a new session with an invalid epoch assertThrows(InvalidShareSessionEpochException.class, () -> sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, - new ShareRequestMetadata(shareSessionKey2.memberId(), 5), true)); + new ShareRequestMetadata(shareSessionKey2.memberId(), 5), true, CONNECTION_ID)); Review Comment: I would have thought that by definition, this would not be the same connection ID as the session which was previously added into the cache. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org