chirag-wadhwa5 commented on code in PR #19778: URL: https://github.com/apache/kafka/pull/19778#discussion_r2104316173
########## core/src/test/java/kafka/server/share/SharePartitionManagerTest.java: ########## @@ -372,6 +459,110 @@ public void testNewContext() { assertEquals(0, cache.size()); } + @Test + public void testAcknowledgeSessionUpdateThrowsOnInitialEpoch() { + ShareSessionCache cache = new ShareSessionCache(10); + sharePartitionManager = SharePartitionManagerBuilder.builder() + .withCache(cache) + .build(); + + assertThrows(InvalidShareSessionEpochException.class, + () -> sharePartitionManager.acknowledgeSessionUpdate("grp", + new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH))); + } + + @Test + public void testAcknowledgeSessionUpdateThrowsWhenShareSessionNotFound() { + ShareSessionCache cache = new ShareSessionCache(10); + sharePartitionManager = SharePartitionManagerBuilder.builder() + .withCache(cache) + .build(); + + // The share session corresponding to this memberId has not been created yet. This should throw an exception. + assertThrows(ShareSessionNotFoundException.class, + () -> sharePartitionManager.acknowledgeSessionUpdate("grp", + new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)))); + } + + @Test + public void testAcknowledgeSessionUpdateThrowsInvalidShareSessionEpochException() { + ShareSessionCache cache = new ShareSessionCache(10); + sharePartitionManager = SharePartitionManagerBuilder.builder() + .withCache(cache) + .build(); + + Uuid tpId0 = Uuid.randomUuid(); + TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1)); + + String groupId = "grp"; + Uuid memberId = Uuid.randomUuid(); + + // Create a new share session with an initial share fetch request + ShareFetchContext context1 = sharePartitionManager.newContext(groupId, List.of(tp0, tp1), EMPTY_PART_LIST, + new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH), false, CONNECTION_ID); + assertInstanceOf(ShareSessionContext.class, context1); + assertFalse(((ShareSessionContext) context1).isSubsequent()); + + // The expected epoch from the share session should be 1, but we are passing 2. This should throw an exception. Review Comment: Thanks for the review. I understand the other places I forgot to update the log, but I believe in this referred place it is fine. The code does the following -> `ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))` ShareRequestMetadata.nextEpoch() is called twice, thereby increasing epoch from 0 to 2. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org