chirag-wadhwa5 commented on code in PR #19778:
URL: https://github.com/apache/kafka/pull/19778#discussion_r2104316173


##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -372,6 +459,110 @@ public void testNewContext() {
         assertEquals(0, cache.size());
     }
 
+    @Test
+    public void testAcknowledgeSessionUpdateThrowsOnInitialEpoch() {
+        ShareSessionCache cache = new ShareSessionCache(10);
+        sharePartitionManager = SharePartitionManagerBuilder.builder()
+            .withCache(cache)
+            .build();
+
+        assertThrows(InvalidShareSessionEpochException.class,
+            () -> sharePartitionManager.acknowledgeSessionUpdate("grp",
+                new ShareRequestMetadata(Uuid.randomUuid(), 
ShareRequestMetadata.INITIAL_EPOCH)));
+    }
+
+    @Test
+    public void testAcknowledgeSessionUpdateThrowsWhenShareSessionNotFound() {
+        ShareSessionCache cache = new ShareSessionCache(10);
+        sharePartitionManager = SharePartitionManagerBuilder.builder()
+            .withCache(cache)
+            .build();
+
+        // The share session corresponding to this memberId has not been 
created yet. This should throw an exception.
+        assertThrows(ShareSessionNotFoundException.class,
+            () -> sharePartitionManager.acknowledgeSessionUpdate("grp",
+                new ShareRequestMetadata(Uuid.randomUuid(), 
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))));
+    }
+
+    @Test
+    public void 
testAcknowledgeSessionUpdateThrowsInvalidShareSessionEpochException() {
+        ShareSessionCache cache = new ShareSessionCache(10);
+        sharePartitionManager = SharePartitionManagerBuilder.builder()
+            .withCache(cache)
+            .build();
+
+        Uuid tpId0 = Uuid.randomUuid();
+        TopicIdPartition tp0 = new TopicIdPartition(tpId0, new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(tpId0, new 
TopicPartition("foo", 1));
+
+        String groupId = "grp";
+        Uuid memberId = Uuid.randomUuid();
+
+        // Create a new share session with an initial share fetch request
+        ShareFetchContext context1 = sharePartitionManager.newContext(groupId, 
List.of(tp0, tp1), EMPTY_PART_LIST,
+            new ShareRequestMetadata(memberId, 
ShareRequestMetadata.INITIAL_EPOCH), false, CONNECTION_ID);
+        assertInstanceOf(ShareSessionContext.class, context1);
+        assertFalse(((ShareSessionContext) context1).isSubsequent());
+
+        // The expected epoch from the share session should be 1, but we are 
passing 2. This should throw an exception.

Review Comment:
   Thanks for the review. I understand the other places I forgot to update the 
log, but I believe in this referred place it is fine. The code does the 
following ->
   
`ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))`
   
   ShareRequestMetadata.nextEpoch() is called twice, thereby increasing epoch 
from 0 to 2.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to