apoorvmittal10 commented on code in PR #18651:
URL: https://github.com/apache/kafka/pull/18651#discussion_r1931143901


##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -2595,6 +2570,63 @@ public void 
testSharePartitionListenerOnBecomingFollower() {
         testSharePartitionListener(sharePartitionKey, partitionCacheMap, 
mockReplicaManager, partitionListener::onBecomingFollower);
     }
 
+    @Test
+    public void testFetchMessagesRotatePartitions() {
+        String groupId = "grp";
+        Uuid memberId1 = Uuid.randomUuid();
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("bar", 0));
+        TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("bar", 1));
+        TopicIdPartition tp4 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 2));
+        TopicIdPartition tp5 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("bar", 2));
+        TopicIdPartition tp6 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 3));
+        LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = 
orderedMap(PARTITION_MAX_BYTES,
+            tp0, tp1, tp2, tp3, tp4, tp5, tp6);
+
+        SharePartitionManager sharePartitionManager = 
Mockito.spy(SharePartitionManagerBuilder.builder().build());
+        // Capture the arguments passed to processShareFetch.
+        ArgumentCaptor<ShareFetch> captor = 
ArgumentCaptor.forClass(ShareFetch.class);
+
+        sharePartitionManager.fetchMessages(groupId, memberId1.toString(), 
FETCH_PARAMS, 0, BATCH_SIZE,
+            partitionMaxBytes);
+        verify(sharePartitionManager, 
times(1)).processShareFetch(captor.capture());
+        // Verify the partitions rotation, no rotation.
+        ShareFetch resultShareFetch = captor.getValue();
+        validateRotatedMapEquals(resultShareFetch.partitionMaxBytes(), 
partitionMaxBytes, 0);
+
+        // Single rotation.
+        sharePartitionManager.fetchMessages(groupId, memberId1.toString(), 
FETCH_PARAMS, 1, BATCH_SIZE,
+            partitionMaxBytes);
+        verify(sharePartitionManager, 
times(2)).processShareFetch(captor.capture());
+        // Verify the partitions rotation, rotate by 1.
+        resultShareFetch = captor.getValue();
+        validateRotatedMapEquals(partitionMaxBytes, 
resultShareFetch.partitionMaxBytes(), 1);
+
+        // Rotation by 3, less that the number of partitions.
+        sharePartitionManager.fetchMessages(groupId, memberId1.toString(), 
FETCH_PARAMS, 3, BATCH_SIZE,
+            partitionMaxBytes);
+        verify(sharePartitionManager, 
times(3)).processShareFetch(captor.capture());
+        // Verify the partitions rotation, rotate by 3.
+        resultShareFetch = captor.getValue();
+        validateRotatedMapEquals(partitionMaxBytes, 
resultShareFetch.partitionMaxBytes(), 3);
+
+        // Rotation by 12, more than the number of partitions.
+        sharePartitionManager.fetchMessages(groupId, memberId1.toString(), 
FETCH_PARAMS, 12, BATCH_SIZE,
+            partitionMaxBytes);
+        verify(sharePartitionManager, 
times(4)).processShareFetch(captor.capture());
+        // Verify the partitions rotation, rotate by 5 (12 % 7).
+        resultShareFetch = captor.getValue();
+        validateRotatedMapEquals(partitionMaxBytes, 
resultShareFetch.partitionMaxBytes(), 5);
+
+        sharePartitionManager.fetchMessages(groupId, memberId1.toString(), 
FETCH_PARAMS, Integer.MAX_VALUE, BATCH_SIZE,
+            partitionMaxBytes);
+        verify(sharePartitionManager, 
times(5)).processShareFetch(captor.capture());
+        // Verify the partitions rotation, rotate by 1 (2147483647 % 7).
+        resultShareFetch = captor.getValue();
+        validateRotatedMapEquals(partitionMaxBytes, 
resultShareFetch.partitionMaxBytes(), 1);

Review Comment:
   I wanted to explicitly mention that rotation should happen at 1, hence wrote 
it. Though I have missed a comment on couple of lines above about the test, 
added that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to