apoorvmittal10 commented on code in PR #18651: URL: https://github.com/apache/kafka/pull/18651#discussion_r1931143901
########## core/src/test/java/kafka/server/share/SharePartitionManagerTest.java: ########## @@ -2595,6 +2570,63 @@ public void testSharePartitionListenerOnBecomingFollower() { testSharePartitionListener(sharePartitionKey, partitionCacheMap, mockReplicaManager, partitionListener::onBecomingFollower); } + @Test + public void testFetchMessagesRotatePartitions() { + String groupId = "grp"; + Uuid memberId1 = Uuid.randomUuid(); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("bar", 0)); + TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("bar", 1)); + TopicIdPartition tp4 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2)); + TopicIdPartition tp5 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("bar", 2)); + TopicIdPartition tp6 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 3)); + LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, + tp0, tp1, tp2, tp3, tp4, tp5, tp6); + + SharePartitionManager sharePartitionManager = Mockito.spy(SharePartitionManagerBuilder.builder().build()); + // Capture the arguments passed to processShareFetch. + ArgumentCaptor<ShareFetch> captor = ArgumentCaptor.forClass(ShareFetch.class); + + sharePartitionManager.fetchMessages(groupId, memberId1.toString(), FETCH_PARAMS, 0, BATCH_SIZE, + partitionMaxBytes); + verify(sharePartitionManager, times(1)).processShareFetch(captor.capture()); + // Verify the partitions rotation, no rotation. + ShareFetch resultShareFetch = captor.getValue(); + validateRotatedMapEquals(resultShareFetch.partitionMaxBytes(), partitionMaxBytes, 0); + + // Single rotation. + sharePartitionManager.fetchMessages(groupId, memberId1.toString(), FETCH_PARAMS, 1, BATCH_SIZE, + partitionMaxBytes); + verify(sharePartitionManager, times(2)).processShareFetch(captor.capture()); + // Verify the partitions rotation, rotate by 1. + resultShareFetch = captor.getValue(); + validateRotatedMapEquals(partitionMaxBytes, resultShareFetch.partitionMaxBytes(), 1); + + // Rotation by 3, less that the number of partitions. + sharePartitionManager.fetchMessages(groupId, memberId1.toString(), FETCH_PARAMS, 3, BATCH_SIZE, + partitionMaxBytes); + verify(sharePartitionManager, times(3)).processShareFetch(captor.capture()); + // Verify the partitions rotation, rotate by 3. + resultShareFetch = captor.getValue(); + validateRotatedMapEquals(partitionMaxBytes, resultShareFetch.partitionMaxBytes(), 3); + + // Rotation by 12, more than the number of partitions. + sharePartitionManager.fetchMessages(groupId, memberId1.toString(), FETCH_PARAMS, 12, BATCH_SIZE, + partitionMaxBytes); + verify(sharePartitionManager, times(4)).processShareFetch(captor.capture()); + // Verify the partitions rotation, rotate by 5 (12 % 7). + resultShareFetch = captor.getValue(); + validateRotatedMapEquals(partitionMaxBytes, resultShareFetch.partitionMaxBytes(), 5); + + sharePartitionManager.fetchMessages(groupId, memberId1.toString(), FETCH_PARAMS, Integer.MAX_VALUE, BATCH_SIZE, + partitionMaxBytes); + verify(sharePartitionManager, times(5)).processShareFetch(captor.capture()); + // Verify the partitions rotation, rotate by 1 (2147483647 % 7). + resultShareFetch = captor.getValue(); + validateRotatedMapEquals(partitionMaxBytes, resultShareFetch.partitionMaxBytes(), 1); Review Comment: I wanted to explicitly mention that rotation should happen at 1, hence wrote it. Though I have missed a comment on couple of lines above about the test, added that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org