apoorvmittal10 commented on code in PR #19148:
URL: https://github.com/apache/kafka/pull/19148#discussion_r1991048323


##########
server/src/main/java/org/apache/kafka/server/share/fetch/PartitionRotateStrategy.java:
##########
@@ -80,20 +81,11 @@ static LinkedHashMap<TopicIdPartition, Integer> 
rotateRoundRobin(
             return topicIdPartitions;
         }
 
-        // TODO: Once the partition max bytes is removed then the partition 
will be a linked list and rotation
-        //  will be a simple operation. Else consider using 
ImplicitLinkedHashCollection.
-        LinkedHashMap<TopicIdPartition, Integer> suffixPartitions = new 
LinkedHashMap<>(rotateAt);
-        LinkedHashMap<TopicIdPartition, Integer> rotatedPartitions = new 
LinkedHashMap<>(topicIdPartitions.size());
-        int i = 0;
-        for (Map.Entry<TopicIdPartition, Integer> entry : 
topicIdPartitions.entrySet()) {
-            if (i < rotateAt) {
-                suffixPartitions.put(entry.getKey(), entry.getValue());
-            } else {
-                rotatedPartitions.put(entry.getKey(), entry.getValue());
-            }
-            i++;
-        }
-        rotatedPartitions.putAll(suffixPartitions);
+        // We don't want to modify the original list, hence created a copy.
+        List<TopicIdPartition> rotatedPartitions = new 
ArrayList<>(topicIdPartitions);
+        // We want the elements from the end of the list to move left by the 
distance provided i.e. if the original list is [1,2,3],
+        // and we want to rotate it by 1, we want the output as [2,3,1] and 
not [3,1,2]. Hence, we need negation of distance here.

Review Comment:
   ```suggestion
           // Elements from the list should move left by the distance provided 
i.e. if the original list is [1,2,3],
           // and rotation is by 1, then output should be [2,3,1] and not 
[3,1,2]. Hence, negate the distance here.
   ```



##########
server/src/test/java/org/apache/kafka/server/share/fetch/PartitionRotateStrategyTest.java:
##########
@@ -35,64 +36,64 @@ public class PartitionRotateStrategyTest {
     @Test
     public void testRoundRobinStrategy() {
         PartitionRotateStrategy strategy = 
PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN);
-        LinkedHashMap<TopicIdPartition, Integer> partitions = 
createPartitions(3);
+        List<TopicIdPartition> partitions = createPartitions(3);
 
-        LinkedHashMap<TopicIdPartition, Integer> result = 
strategy.rotate(partitions, new PartitionRotateMetadata(1));
+        List<TopicIdPartition> result = strategy.rotate(partitions, new 
PartitionRotateMetadata(1));
         assertEquals(3, result.size());
-        validateRotatedMapEquals(partitions, result, 1);
+        validateRotatedListEquals(partitions, result, 1);
 
         // Session epoch is greater than the number of partitions.
         result = strategy.rotate(partitions, new PartitionRotateMetadata(5));
         assertEquals(3, result.size());
-        validateRotatedMapEquals(partitions, result, 2);
+        validateRotatedListEquals(partitions, result, 2);
 
         // Session epoch is at Integer.MAX_VALUE.
         result = strategy.rotate(partitions, new 
PartitionRotateMetadata(Integer.MAX_VALUE));
         assertEquals(3, result.size());
-        validateRotatedMapEquals(partitions, result, 1);
+        validateRotatedListEquals(partitions, result, 1);
 
         // No rotation at same size as epoch.
         result = strategy.rotate(partitions, new PartitionRotateMetadata(3));
         assertEquals(3, result.size());
-        validateRotatedMapEquals(partitions, result, 0);
+        validateRotatedListEquals(partitions, result, 0);
     }
 
     @Test
     public void testRoundRobinStrategyWithSpecialSessionEpochs() {
         PartitionRotateStrategy strategy = 
PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN);
 
-        LinkedHashMap<TopicIdPartition, Integer> partitions = 
createPartitions(3);
-        LinkedHashMap<TopicIdPartition, Integer> result = strategy.rotate(
+        List<TopicIdPartition> partitions = createPartitions(3);
+        List<TopicIdPartition> result = strategy.rotate(
             partitions,
             new PartitionRotateMetadata(ShareRequestMetadata.INITIAL_EPOCH));
         assertEquals(3, result.size());
-        validateRotatedMapEquals(partitions, result, 0);
+        validateRotatedListEquals(partitions, result, 0);
 
         result = strategy.rotate(
             partitions,
             new PartitionRotateMetadata(ShareRequestMetadata.FINAL_EPOCH));
         assertEquals(3, result.size());
-        validateRotatedMapEquals(partitions, result, 0);
+        validateRotatedListEquals(partitions, result, 0);
     }
 
     @Test
     public void testRoundRobinStrategyWithEmptyPartitions() {
         PartitionRotateStrategy strategy = 
PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN);
         // Empty partitions.
-        LinkedHashMap<TopicIdPartition, Integer> result = strategy.rotate(new 
LinkedHashMap<>(), new PartitionRotateMetadata(5));
+        List<TopicIdPartition> result = strategy.rotate(new ArrayList<>(), new 
PartitionRotateMetadata(5));
         // The result should be empty.
         assertTrue(result.isEmpty());
     }
 
     /**
-     * Create an ordered map of TopicIdPartition to partition max bytes.
+     * Create an ordered set of topic partitions.

Review Comment:
   ```suggestion
        * Create a list of topic partitions.
   ```



##########
server/src/test/java/org/apache/kafka/server/share/fetch/PartitionRotateStrategyTest.java:
##########
@@ -35,64 +36,64 @@ public class PartitionRotateStrategyTest {
     @Test
     public void testRoundRobinStrategy() {
         PartitionRotateStrategy strategy = 
PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN);
-        LinkedHashMap<TopicIdPartition, Integer> partitions = 
createPartitions(3);
+        List<TopicIdPartition> partitions = createPartitions(3);
 
-        LinkedHashMap<TopicIdPartition, Integer> result = 
strategy.rotate(partitions, new PartitionRotateMetadata(1));
+        List<TopicIdPartition> result = strategy.rotate(partitions, new 
PartitionRotateMetadata(1));
         assertEquals(3, result.size());
-        validateRotatedMapEquals(partitions, result, 1);
+        validateRotatedListEquals(partitions, result, 1);
 
         // Session epoch is greater than the number of partitions.
         result = strategy.rotate(partitions, new PartitionRotateMetadata(5));
         assertEquals(3, result.size());
-        validateRotatedMapEquals(partitions, result, 2);
+        validateRotatedListEquals(partitions, result, 2);
 
         // Session epoch is at Integer.MAX_VALUE.
         result = strategy.rotate(partitions, new 
PartitionRotateMetadata(Integer.MAX_VALUE));
         assertEquals(3, result.size());
-        validateRotatedMapEquals(partitions, result, 1);
+        validateRotatedListEquals(partitions, result, 1);
 
         // No rotation at same size as epoch.
         result = strategy.rotate(partitions, new PartitionRotateMetadata(3));
         assertEquals(3, result.size());
-        validateRotatedMapEquals(partitions, result, 0);
+        validateRotatedListEquals(partitions, result, 0);
     }
 
     @Test
     public void testRoundRobinStrategyWithSpecialSessionEpochs() {
         PartitionRotateStrategy strategy = 
PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN);
 
-        LinkedHashMap<TopicIdPartition, Integer> partitions = 
createPartitions(3);
-        LinkedHashMap<TopicIdPartition, Integer> result = strategy.rotate(
+        List<TopicIdPartition> partitions = createPartitions(3);
+        List<TopicIdPartition> result = strategy.rotate(
             partitions,
             new PartitionRotateMetadata(ShareRequestMetadata.INITIAL_EPOCH));
         assertEquals(3, result.size());
-        validateRotatedMapEquals(partitions, result, 0);
+        validateRotatedListEquals(partitions, result, 0);
 
         result = strategy.rotate(
             partitions,
             new PartitionRotateMetadata(ShareRequestMetadata.FINAL_EPOCH));
         assertEquals(3, result.size());
-        validateRotatedMapEquals(partitions, result, 0);
+        validateRotatedListEquals(partitions, result, 0);
     }
 
     @Test
     public void testRoundRobinStrategyWithEmptyPartitions() {
         PartitionRotateStrategy strategy = 
PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN);
         // Empty partitions.
-        LinkedHashMap<TopicIdPartition, Integer> result = strategy.rotate(new 
LinkedHashMap<>(), new PartitionRotateMetadata(5));
+        List<TopicIdPartition> result = strategy.rotate(new ArrayList<>(), new 
PartitionRotateMetadata(5));
         // The result should be empty.
         assertTrue(result.isEmpty());
     }
 
     /**
-     * Create an ordered map of TopicIdPartition to partition max bytes.
+     * Create an ordered set of topic partitions.
      * @param size The number of topic-partitions to create.
-     * @return The ordered map of TopicIdPartition to partition max bytes.
+     * @return The ordered set of topic partitions.

Review Comment:
   ```suggestion
        * @return The list of topic partitions.
   ```



##########
server/src/main/java/org/apache/kafka/server/share/fetch/PartitionRotateStrategy.java:
##########
@@ -80,20 +81,11 @@ static LinkedHashMap<TopicIdPartition, Integer> 
rotateRoundRobin(
             return topicIdPartitions;
         }
 
-        // TODO: Once the partition max bytes is removed then the partition 
will be a linked list and rotation
-        //  will be a simple operation. Else consider using 
ImplicitLinkedHashCollection.
-        LinkedHashMap<TopicIdPartition, Integer> suffixPartitions = new 
LinkedHashMap<>(rotateAt);
-        LinkedHashMap<TopicIdPartition, Integer> rotatedPartitions = new 
LinkedHashMap<>(topicIdPartitions.size());
-        int i = 0;
-        for (Map.Entry<TopicIdPartition, Integer> entry : 
topicIdPartitions.entrySet()) {
-            if (i < rotateAt) {
-                suffixPartitions.put(entry.getKey(), entry.getValue());
-            } else {
-                rotatedPartitions.put(entry.getKey(), entry.getValue());
-            }
-            i++;
-        }
-        rotatedPartitions.putAll(suffixPartitions);
+        // We don't want to modify the original list, hence created a copy.

Review Comment:
   ```suggestion
           // Avoid modifying the original list, create copy.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to