apoorvmittal10 commented on code in PR #19148: URL: https://github.com/apache/kafka/pull/19148#discussion_r1991048323
########## server/src/main/java/org/apache/kafka/server/share/fetch/PartitionRotateStrategy.java: ########## @@ -80,20 +81,11 @@ static LinkedHashMap<TopicIdPartition, Integer> rotateRoundRobin( return topicIdPartitions; } - // TODO: Once the partition max bytes is removed then the partition will be a linked list and rotation - // will be a simple operation. Else consider using ImplicitLinkedHashCollection. - LinkedHashMap<TopicIdPartition, Integer> suffixPartitions = new LinkedHashMap<>(rotateAt); - LinkedHashMap<TopicIdPartition, Integer> rotatedPartitions = new LinkedHashMap<>(topicIdPartitions.size()); - int i = 0; - for (Map.Entry<TopicIdPartition, Integer> entry : topicIdPartitions.entrySet()) { - if (i < rotateAt) { - suffixPartitions.put(entry.getKey(), entry.getValue()); - } else { - rotatedPartitions.put(entry.getKey(), entry.getValue()); - } - i++; - } - rotatedPartitions.putAll(suffixPartitions); + // We don't want to modify the original list, hence created a copy. + List<TopicIdPartition> rotatedPartitions = new ArrayList<>(topicIdPartitions); + // We want the elements from the end of the list to move left by the distance provided i.e. if the original list is [1,2,3], + // and we want to rotate it by 1, we want the output as [2,3,1] and not [3,1,2]. Hence, we need negation of distance here. Review Comment: ```suggestion // Elements from the list should move left by the distance provided i.e. if the original list is [1,2,3], // and rotation is by 1, then output should be [2,3,1] and not [3,1,2]. Hence, negate the distance here. ``` ########## server/src/test/java/org/apache/kafka/server/share/fetch/PartitionRotateStrategyTest.java: ########## @@ -35,64 +36,64 @@ public class PartitionRotateStrategyTest { @Test public void testRoundRobinStrategy() { PartitionRotateStrategy strategy = PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN); - LinkedHashMap<TopicIdPartition, Integer> partitions = createPartitions(3); + List<TopicIdPartition> partitions = createPartitions(3); - LinkedHashMap<TopicIdPartition, Integer> result = strategy.rotate(partitions, new PartitionRotateMetadata(1)); + List<TopicIdPartition> result = strategy.rotate(partitions, new PartitionRotateMetadata(1)); assertEquals(3, result.size()); - validateRotatedMapEquals(partitions, result, 1); + validateRotatedListEquals(partitions, result, 1); // Session epoch is greater than the number of partitions. result = strategy.rotate(partitions, new PartitionRotateMetadata(5)); assertEquals(3, result.size()); - validateRotatedMapEquals(partitions, result, 2); + validateRotatedListEquals(partitions, result, 2); // Session epoch is at Integer.MAX_VALUE. result = strategy.rotate(partitions, new PartitionRotateMetadata(Integer.MAX_VALUE)); assertEquals(3, result.size()); - validateRotatedMapEquals(partitions, result, 1); + validateRotatedListEquals(partitions, result, 1); // No rotation at same size as epoch. result = strategy.rotate(partitions, new PartitionRotateMetadata(3)); assertEquals(3, result.size()); - validateRotatedMapEquals(partitions, result, 0); + validateRotatedListEquals(partitions, result, 0); } @Test public void testRoundRobinStrategyWithSpecialSessionEpochs() { PartitionRotateStrategy strategy = PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN); - LinkedHashMap<TopicIdPartition, Integer> partitions = createPartitions(3); - LinkedHashMap<TopicIdPartition, Integer> result = strategy.rotate( + List<TopicIdPartition> partitions = createPartitions(3); + List<TopicIdPartition> result = strategy.rotate( partitions, new PartitionRotateMetadata(ShareRequestMetadata.INITIAL_EPOCH)); assertEquals(3, result.size()); - validateRotatedMapEquals(partitions, result, 0); + validateRotatedListEquals(partitions, result, 0); result = strategy.rotate( partitions, new PartitionRotateMetadata(ShareRequestMetadata.FINAL_EPOCH)); assertEquals(3, result.size()); - validateRotatedMapEquals(partitions, result, 0); + validateRotatedListEquals(partitions, result, 0); } @Test public void testRoundRobinStrategyWithEmptyPartitions() { PartitionRotateStrategy strategy = PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN); // Empty partitions. - LinkedHashMap<TopicIdPartition, Integer> result = strategy.rotate(new LinkedHashMap<>(), new PartitionRotateMetadata(5)); + List<TopicIdPartition> result = strategy.rotate(new ArrayList<>(), new PartitionRotateMetadata(5)); // The result should be empty. assertTrue(result.isEmpty()); } /** - * Create an ordered map of TopicIdPartition to partition max bytes. + * Create an ordered set of topic partitions. Review Comment: ```suggestion * Create a list of topic partitions. ``` ########## server/src/test/java/org/apache/kafka/server/share/fetch/PartitionRotateStrategyTest.java: ########## @@ -35,64 +36,64 @@ public class PartitionRotateStrategyTest { @Test public void testRoundRobinStrategy() { PartitionRotateStrategy strategy = PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN); - LinkedHashMap<TopicIdPartition, Integer> partitions = createPartitions(3); + List<TopicIdPartition> partitions = createPartitions(3); - LinkedHashMap<TopicIdPartition, Integer> result = strategy.rotate(partitions, new PartitionRotateMetadata(1)); + List<TopicIdPartition> result = strategy.rotate(partitions, new PartitionRotateMetadata(1)); assertEquals(3, result.size()); - validateRotatedMapEquals(partitions, result, 1); + validateRotatedListEquals(partitions, result, 1); // Session epoch is greater than the number of partitions. result = strategy.rotate(partitions, new PartitionRotateMetadata(5)); assertEquals(3, result.size()); - validateRotatedMapEquals(partitions, result, 2); + validateRotatedListEquals(partitions, result, 2); // Session epoch is at Integer.MAX_VALUE. result = strategy.rotate(partitions, new PartitionRotateMetadata(Integer.MAX_VALUE)); assertEquals(3, result.size()); - validateRotatedMapEquals(partitions, result, 1); + validateRotatedListEquals(partitions, result, 1); // No rotation at same size as epoch. result = strategy.rotate(partitions, new PartitionRotateMetadata(3)); assertEquals(3, result.size()); - validateRotatedMapEquals(partitions, result, 0); + validateRotatedListEquals(partitions, result, 0); } @Test public void testRoundRobinStrategyWithSpecialSessionEpochs() { PartitionRotateStrategy strategy = PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN); - LinkedHashMap<TopicIdPartition, Integer> partitions = createPartitions(3); - LinkedHashMap<TopicIdPartition, Integer> result = strategy.rotate( + List<TopicIdPartition> partitions = createPartitions(3); + List<TopicIdPartition> result = strategy.rotate( partitions, new PartitionRotateMetadata(ShareRequestMetadata.INITIAL_EPOCH)); assertEquals(3, result.size()); - validateRotatedMapEquals(partitions, result, 0); + validateRotatedListEquals(partitions, result, 0); result = strategy.rotate( partitions, new PartitionRotateMetadata(ShareRequestMetadata.FINAL_EPOCH)); assertEquals(3, result.size()); - validateRotatedMapEquals(partitions, result, 0); + validateRotatedListEquals(partitions, result, 0); } @Test public void testRoundRobinStrategyWithEmptyPartitions() { PartitionRotateStrategy strategy = PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN); // Empty partitions. - LinkedHashMap<TopicIdPartition, Integer> result = strategy.rotate(new LinkedHashMap<>(), new PartitionRotateMetadata(5)); + List<TopicIdPartition> result = strategy.rotate(new ArrayList<>(), new PartitionRotateMetadata(5)); // The result should be empty. assertTrue(result.isEmpty()); } /** - * Create an ordered map of TopicIdPartition to partition max bytes. + * Create an ordered set of topic partitions. * @param size The number of topic-partitions to create. - * @return The ordered map of TopicIdPartition to partition max bytes. + * @return The ordered set of topic partitions. Review Comment: ```suggestion * @return The list of topic partitions. ``` ########## server/src/main/java/org/apache/kafka/server/share/fetch/PartitionRotateStrategy.java: ########## @@ -80,20 +81,11 @@ static LinkedHashMap<TopicIdPartition, Integer> rotateRoundRobin( return topicIdPartitions; } - // TODO: Once the partition max bytes is removed then the partition will be a linked list and rotation - // will be a simple operation. Else consider using ImplicitLinkedHashCollection. - LinkedHashMap<TopicIdPartition, Integer> suffixPartitions = new LinkedHashMap<>(rotateAt); - LinkedHashMap<TopicIdPartition, Integer> rotatedPartitions = new LinkedHashMap<>(topicIdPartitions.size()); - int i = 0; - for (Map.Entry<TopicIdPartition, Integer> entry : topicIdPartitions.entrySet()) { - if (i < rotateAt) { - suffixPartitions.put(entry.getKey(), entry.getValue()); - } else { - rotatedPartitions.put(entry.getKey(), entry.getValue()); - } - i++; - } - rotatedPartitions.putAll(suffixPartitions); + // We don't want to modify the original list, hence created a copy. Review Comment: ```suggestion // Avoid modifying the original list, create copy. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org