chirag-wadhwa5 commented on code in PR #19778:
URL: https://github.com/apache/kafka/pull/19778#discussion_r2111297290


##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -276,6 +278,91 @@ public void 
testNewContextReturnsFinalContextWhenTopicPartitionsArePresentInRequ
             sharePartitionManager.newContext(groupId, reqData2, List.of(), 
reqMetadata2, true, CONNECTION_ID));
     }
 
+    @Test
+    public void 
testNewContextThrowsErrorWhenShareSessionNotFoundOnFinalEpoch() {
+        ShareSessionCache cache = new ShareSessionCache(10);
+        sharePartitionManager = SharePartitionManagerBuilder.builder()
+            .withCache(cache)
+            .build();
+        assertThrows(ShareSessionNotFoundException.class, () -> 
sharePartitionManager.newContext("grp", List.of(), List.of(),
+            new ShareRequestMetadata(Uuid.randomUuid(), 
ShareRequestMetadata.FINAL_EPOCH), false, CONNECTION_ID));
+    }
+
+    @Test
+    public void 
testNewContextThrowsErrorWhenAcknowledgeDataPresentOnInitialEpoch() {
+        ShareSessionCache cache = new ShareSessionCache(10);
+        sharePartitionManager = SharePartitionManagerBuilder.builder()
+            .withCache(cache)
+            .build();
+        Uuid tpId0 = Uuid.randomUuid();
+        TopicIdPartition tp0 = new TopicIdPartition(tpId0, new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(tpId0, new 
TopicPartition("foo", 1));
+
+        assertThrows(InvalidRequestException.class, () -> 
sharePartitionManager.newContext("grp", List.of(tp0, tp1), List.of(),
+            new ShareRequestMetadata(Uuid.randomUuid(), 
ShareRequestMetadata.INITIAL_EPOCH), true, CONNECTION_ID));
+    }
+
+    @Test
+    public void 
testNewContextThrowsErrorWhenShareSessionCacheIsFullOnInitialEpoch() {
+        // Define a cache with max size 1
+        ShareSessionCache cache = new ShareSessionCache(1);
+        sharePartitionManager = SharePartitionManagerBuilder.builder()
+            .withCache(cache)
+            .build();
+
+        Uuid tpId0 = Uuid.randomUuid();
+        TopicIdPartition tp0 = new TopicIdPartition(tpId0, new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(tpId0, new 
TopicPartition("foo", 1));
+
+        String groupId = "grp";
+        Uuid memberId1 = Uuid.randomUuid();
+        Uuid memberId2 = Uuid.randomUuid();
+
+        // Create a new share session with an initial share fetch request
+        List<TopicIdPartition> reqData = List.of(tp0, tp1);
+
+        ShareRequestMetadata reqMetadata1 = new 
ShareRequestMetadata(memberId1, ShareRequestMetadata.INITIAL_EPOCH);
+        ShareFetchContext context1 = sharePartitionManager.newContext(groupId, 
reqData, EMPTY_PART_LIST, reqMetadata1, false, CONNECTION_ID);

Review Comment:
   Thanks for the review. I have changed the entire file to use EMTY_PART_LIST.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to