AndrewJSchofield commented on code in PR #19329:
URL: https://github.com/apache/kafka/pull/19329#discussion_r2063614759


##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -2065,6 +2066,67 @@ public void testComplexShareConsumer() throws Exception {
         verifyShareGroupStateTopicRecordsProduced();
     }
 
+    @ClusterTest(
+        brokers = 1,
+        serverProperties = {
+            @ClusterConfigProperty(key = "auto.create.topics.enable", value = 
"false"),
+            @ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+            @ClusterConfigProperty(key = "group.share.enable", value = "true"),
+            @ClusterConfigProperty(key = 
"group.share.partition.max.record.locks", value = "10000"),
+            @ClusterConfigProperty(key = 
"group.share.record.lock.duration.ms", value = "15000"),
+            @ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.min.isr", value = "1"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.num.partitions", value = "3"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.replication.factor", value = "1"),
+            @ClusterConfigProperty(key = "transaction.state.log.min.isr", 
value = "1"),
+            @ClusterConfigProperty(key = 
"transaction.state.log.replication.factor", value = "1"),
+            @ClusterConfigProperty(key = "unstable.api.versions.enable", value 
= "true"),
+            @ClusterConfigProperty(key = "group.share.max.size", value = "3") 
// Setting max group size to 3
+        }
+    )
+    public void testShareGroupMaxSizeConfigExceeded() throws Exception {
+        alterShareAutoOffsetReset("group1", "earliest");
+        // creating 3 consumers in the group1
+        ShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer("group1");
+        ShareConsumer<byte[], byte[]> shareConsumer2 = 
createShareConsumer("group1");
+        ShareConsumer<byte[], byte[]> shareConsumer3 = 
createShareConsumer("group1");
+
+        shareConsumer1.subscribe(Set.of(tp.topic()));
+        shareConsumer2.subscribe(Set.of(tp.topic()));
+        shareConsumer3.subscribe(Set.of(tp.topic()));
+
+        produceMessages(1);

Review Comment:
   What's the point of producing the records here? If you're just checking the 
max group size, subscribing and polling is enough. And then you don't need to 
alter the auto-offset reset config because you wouldn't be consuming, which 
would make the test run much more quickly.



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -2065,6 +2066,67 @@ public void testComplexShareConsumer() throws Exception {
         verifyShareGroupStateTopicRecordsProduced();
     }
 
+    @ClusterTest(
+        brokers = 1,
+        serverProperties = {
+            @ClusterConfigProperty(key = "auto.create.topics.enable", value = 
"false"),
+            @ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+            @ClusterConfigProperty(key = "group.share.enable", value = "true"),
+            @ClusterConfigProperty(key = 
"group.share.partition.max.record.locks", value = "10000"),
+            @ClusterConfigProperty(key = 
"group.share.record.lock.duration.ms", value = "15000"),
+            @ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.min.isr", value = "1"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.num.partitions", value = "3"),
+            @ClusterConfigProperty(key = 
"share.coordinator.state.topic.replication.factor", value = "1"),
+            @ClusterConfigProperty(key = "transaction.state.log.min.isr", 
value = "1"),
+            @ClusterConfigProperty(key = 
"transaction.state.log.replication.factor", value = "1"),
+            @ClusterConfigProperty(key = "unstable.api.versions.enable", value 
= "true"),

Review Comment:
   The APIs are now stable. No need for this config now.



##########
core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala:
##########
@@ -2052,12 +2052,99 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
     metadata = new ShareRequestMetadata(wrongMemberId, shareSessionEpoch)
     shareFetchRequest = createShareFetchRequest(groupId, metadata, send, 
Seq.empty, Map.empty)
-    shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+    shareFetchResponse = 
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
 
     shareFetchResponseData = shareFetchResponse.data()
     assertEquals(Errors.SHARE_SESSION_NOT_FOUND.code, 
shareFetchResponseData.errorCode)
   }
 
+  @ClusterTests(
+    Array(
+      new ClusterTest(
+        serverProperties = Array(
+          new ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+          new ClusterConfigProperty(key = "group.share.enable", value = 
"true"),
+          new ClusterConfigProperty(key = "offsets.topic.num.partitions", 
value = "1"),
+          new ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1"),
+          new ClusterConfigProperty(key = "group.share.max.share.sessions", 
value="2"),
+          new ClusterConfigProperty(key = "group.share.max.size", value="2")
+        )
+      ),
+      new ClusterTest(
+        serverProperties = Array(
+          new ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+          new ClusterConfigProperty(key = "group.share.enable", value = 
"true"),
+          new ClusterConfigProperty(key = "offsets.topic.num.partitions", 
value = "1"),
+          new ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1"),
+          new ClusterConfigProperty(key = "group.share.persister.class.name", 
value = "org.apache.kafka.server.share.persister.DefaultStatePersister"),
+          new ClusterConfigProperty(key = 
"share.coordinator.state.topic.replication.factor", value = "1"),
+          new ClusterConfigProperty(key = 
"share.coordinator.state.topic.num.partitions", value = "1"),
+          new ClusterConfigProperty(key = "unstable.api.versions.enable", 
value = "true"),
+          new ClusterConfigProperty(key = "group.share.max.share.sessions", 
value="2"),
+          new ClusterConfigProperty(key = "group.share.max.size", value="2")
+        )
+      ),
+    )
+  )
+  def testShareSessionEvictedOnConnectionDrop(): Unit = {
+    val groupId: String = "group"
+    val memberId1 = Uuid.randomUuid()
+    val memberId2 = Uuid.randomUuid()
+    val memberId3 = Uuid.randomUuid()
+
+    val topic = "topic"
+    val partition = 0
+
+    createTopicAndReturnLeaders(topic, numPartitions = 3)
+    val topicIds = getTopicIds.asJava
+    val topicId = topicIds.get(topic)
+    val topicIdPartition = new TopicIdPartition(topicId, new 
TopicPartition(topic, partition))
+
+    val send: Seq[TopicIdPartition] = Seq(topicIdPartition)
+
+    // member1 sends share fetch request to register it's share session. Note 
it does not close the socket connection after.
+    TestUtils.waitUntilTrue(() => {
+      val metadata = new ShareRequestMetadata(memberId1, 
ShareRequestMetadata.INITIAL_EPOCH)
+      val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, 
Seq.empty, Map.empty)
+      val shareFetchResponse = 
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
+      val shareFetchResponseData = shareFetchResponse.data()
+      shareFetchResponseData.errorCode == Errors.NONE.code
+    }, "Share fetch request failed", 5000)
+
+    // member2 sends share fetch request to register it's share session. Note 
it does not close the socket connection after.
+    TestUtils.waitUntilTrue(() => {
+      val metadata = new ShareRequestMetadata(memberId2, 
ShareRequestMetadata.INITIAL_EPOCH)
+      val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, 
Seq.empty, Map.empty)
+      val shareFetchResponse = 
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
+      val shareFetchResponseData = shareFetchResponse.data()
+      shareFetchResponseData.errorCode == Errors.NONE.code
+    }, "Share fetch request failed", 5000)
+
+    // member3 sends share fetch request to register it's share session. Since 
the maximum number of share sessions that could
+    // exist in the share session cache is 2 (group.share.max.share.sessions), 
the attempt to register a third
+    // share session with the ShareSessionCache would throw 
SHARE_SESSION_NOT_FOUND

Review Comment:
   This is actually `SHARE_SESSION_LIMIT_REACHED` according to the KIP.



##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -237,7 +238,7 @@ public void 
testNewContextReturnsFinalContextWithRequestData() {
         // shareFetch is not empty, but the maxBytes of topic partition is 0, 
which means this is added only for acknowledgements.

Review Comment:
   Haven't we removed partitionMaxBytes?



##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -423,8 +423,14 @@ private CompletableFuture<Map<TopicIdPartition, 
ShareAcknowledgeResponseData.Par
      * @param isAcknowledgeDataPresent This tells whether the fetch request 
received includes piggybacked acknowledgements or not

Review Comment:
   nit: Missing parameter for `clientConnectionId` in javadoc comment.



##########
core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala:
##########
@@ -2052,12 +2052,99 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
     shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
     metadata = new ShareRequestMetadata(wrongMemberId, shareSessionEpoch)
     shareFetchRequest = createShareFetchRequest(groupId, metadata, send, 
Seq.empty, Map.empty)
-    shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+    shareFetchResponse = 
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
 
     shareFetchResponseData = shareFetchResponse.data()
     assertEquals(Errors.SHARE_SESSION_NOT_FOUND.code, 
shareFetchResponseData.errorCode)
   }
 
+  @ClusterTests(
+    Array(
+      new ClusterTest(
+        serverProperties = Array(
+          new ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+          new ClusterConfigProperty(key = "group.share.enable", value = 
"true"),
+          new ClusterConfigProperty(key = "offsets.topic.num.partitions", 
value = "1"),
+          new ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1"),
+          new ClusterConfigProperty(key = "group.share.max.share.sessions", 
value="2"),
+          new ClusterConfigProperty(key = "group.share.max.size", value="2")
+        )
+      ),
+      new ClusterTest(
+        serverProperties = Array(
+          new ClusterConfigProperty(key = 
"group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
+          new ClusterConfigProperty(key = "group.share.enable", value = 
"true"),
+          new ClusterConfigProperty(key = "offsets.topic.num.partitions", 
value = "1"),
+          new ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1"),
+          new ClusterConfigProperty(key = "group.share.persister.class.name", 
value = "org.apache.kafka.server.share.persister.DefaultStatePersister"),
+          new ClusterConfigProperty(key = 
"share.coordinator.state.topic.replication.factor", value = "1"),
+          new ClusterConfigProperty(key = 
"share.coordinator.state.topic.num.partitions", value = "1"),
+          new ClusterConfigProperty(key = "unstable.api.versions.enable", 
value = "true"),

Review Comment:
   The APIs are stable now.



##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -269,7 +270,7 @@ public void 
testNewContextReturnsFinalContextWhenTopicPartitionsArePresentInRequ
         // shareFetch is not empty, and it contains tpId1, which should return 
FinalContext instance since it is FINAL_EPOCH
         List<TopicIdPartition> reqData3 = List.of(new TopicIdPartition(tpId1, 
new TopicPartition("foo", 0)));

Review Comment:
   Again, `reqData2` I think.



##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -237,7 +238,7 @@ public void 
testNewContextReturnsFinalContextWithRequestData() {
         // shareFetch is not empty, but the maxBytes of topic partition is 0, 
which means this is added only for acknowledgements.
         // New context should be created successfully
         List<TopicIdPartition> reqData3 = List.of(new TopicIdPartition(tpId1, 
new TopicPartition("foo", 0)));

Review Comment:
   This would be more sensibly named as `reqData2` I think.



##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -314,16 +315,16 @@ public void testNewContext() {
 
         // Test trying to create a new session with an invalid epoch
         assertThrows(InvalidShareSessionEpochException.class, () -> 
sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST,
-            new ShareRequestMetadata(shareSessionKey2.memberId(), 5), true));
+            new ShareRequestMetadata(shareSessionKey2.memberId(), 5), true, 
CONNECTION_ID));

Review Comment:
   I would have thought that by definition, this would not be the same 
connection ID as the session which was previously added into the cache.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to