apoorvmittal10 commented on code in PR #19148: URL: https://github.com/apache/kafka/pull/19148#discussion_r1986418726
########## core/src/main/java/kafka/server/share/SharePartitionManager.java: ########## @@ -258,14 +257,14 @@ public CompletableFuture<Map<TopicIdPartition, PartitionData>> fetchMessages( FetchParams fetchParams, int sessionEpoch, int batchSize, - LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes + ArrayList<TopicIdPartition> topicPartitions Review Comment: ```suggestion List<TopicIdPartition> topicIdPartitions ``` ########## core/src/test/java/kafka/server/share/DelayedShareFetchTest.java: ########## @@ -526,7 +518,7 @@ public void testForceCompleteTriggersDelayedActionsQueue() { TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0)); TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1)); TopicIdPartition tp2 = new TopicIdPartition(topicId, new TopicPartition("foo", 2)); - LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes1 = orderedMap(PARTITION_MAX_BYTES, tp0, tp1); + ArrayList<TopicIdPartition> topicIdPartitions1 = arrayList(tp0, tp1); Review Comment: ```suggestion List<TopicIdPartition> topicIdPartitions1 = List.of(tp0, tp1); ``` ########## server/src/main/java/org/apache/kafka/server/share/fetch/PartitionRotateStrategy.java: ########## @@ -48,7 +47,7 @@ public String toString() { * * @return the rotated topicIdPartitions */ - LinkedHashMap<TopicIdPartition, Integer> rotate(LinkedHashMap<TopicIdPartition, Integer> topicIdPartitions, PartitionRotateMetadata metadata); + ArrayList<TopicIdPartition> rotate(ArrayList<TopicIdPartition> topicIdPartitions, PartitionRotateMetadata metadata); Review Comment: ```suggestion List<TopicIdPartition> rotate(List<TopicIdPartition> topicIdPartitions, PartitionRotateMetadata metadata); ``` ########## clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java: ########## @@ -151,7 +149,7 @@ public String toString() { } private final ShareFetchRequestData data; - private volatile LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchData = null; + private volatile ArrayList<TopicIdPartition> shareFetchData = null; Review Comment: ```suggestion private volatile List<TopicIdPartition> shareFetchData = null; ``` ########## server/src/main/java/org/apache/kafka/server/share/fetch/PartitionRotateStrategy.java: ########## @@ -64,8 +63,8 @@ static PartitionRotateStrategy type(StrategyType type) { * * @return the rotated topicIdPartitions */ - static LinkedHashMap<TopicIdPartition, Integer> rotateRoundRobin( - LinkedHashMap<TopicIdPartition, Integer> topicIdPartitions, + static ArrayList<TopicIdPartition> rotateRoundRobin( Review Comment: ```suggestion static List<TopicIdPartition> rotateRoundRobin( ``` ########## server/src/main/java/org/apache/kafka/server/share/session/ShareSession.java: ########## @@ -110,25 +109,20 @@ public synchronized LastUsedKey lastUsedKey() { return new LastUsedKey(key, lastUsedMs); } - // Visible for testing - public synchronized long creationMs() { - return creationMs; - } - // Update the cached partition data based on the request. - public synchronized Map<ModifiedTopicIdPartitionType, List<TopicIdPartition>> update(Map<TopicIdPartition, - ShareFetchRequest.SharePartitionData> shareFetchData, List<TopicIdPartition> toForget) { + public synchronized Map<ModifiedTopicIdPartitionType, List<TopicIdPartition>> update( + List<TopicIdPartition> shareFetchData, + List<TopicIdPartition> toForget) { Review Comment: ```suggestion List<TopicIdPartition> toForget ) { ``` ########## server/src/main/java/org/apache/kafka/server/share/fetch/PartitionRotateStrategy.java: ########## @@ -80,20 +79,18 @@ static LinkedHashMap<TopicIdPartition, Integer> rotateRoundRobin( return topicIdPartitions; } - // TODO: Once the partition max bytes is removed then the partition will be a linked list and rotation - // will be a simple operation. Else consider using ImplicitLinkedHashCollection. - LinkedHashMap<TopicIdPartition, Integer> suffixPartitions = new LinkedHashMap<>(rotateAt); - LinkedHashMap<TopicIdPartition, Integer> rotatedPartitions = new LinkedHashMap<>(topicIdPartitions.size()); + ArrayList<TopicIdPartition> suffixPartitions = new ArrayList<>(rotateAt); + ArrayList<TopicIdPartition> rotatedPartitions = new ArrayList<>(topicIdPartitions.size()); int i = 0; - for (Map.Entry<TopicIdPartition, Integer> entry : topicIdPartitions.entrySet()) { + for (TopicIdPartition topicIdPartition : topicIdPartitions) { if (i < rotateAt) { - suffixPartitions.put(entry.getKey(), entry.getValue()); + suffixPartitions.add(topicIdPartition); } else { - rotatedPartitions.put(entry.getKey(), entry.getValue()); + rotatedPartitions.add(topicIdPartition); } i++; } - rotatedPartitions.putAll(suffixPartitions); + rotatedPartitions.addAll(suffixPartitions); Review Comment: Should we use `Collections.rotate` instead? ########## core/src/main/java/kafka/server/share/SharePartitionManager.java: ########## @@ -427,29 +426,20 @@ private CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.Par /** * The newContext method is used to create a new share fetch context for every share fetch request. * @param groupId The group id in the share fetch request. - * @param shareFetchData The topic-partitions and their corresponding maxBytes data in the share fetch request. + * @param shareFetchData The topic-partitions in the share fetch request. * @param toForget The topic-partitions to forget present in the share fetch request. * @param reqMetadata The metadata in the share fetch request. * @param isAcknowledgeDataPresent This tells whether the fetch request received includes piggybacked acknowledgements or not * @return The new share fetch context object */ - public ShareFetchContext newContext(String groupId, Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchData, + public ShareFetchContext newContext(String groupId, List<TopicIdPartition> shareFetchData, List<TopicIdPartition> toForget, ShareRequestMetadata reqMetadata, Boolean isAcknowledgeDataPresent) { ShareFetchContext context; - // TopicPartition with maxBytes as 0 should not be added in the cachedPartitions - Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchDataWithMaxBytes = new HashMap<>(); - shareFetchData.forEach((tp, sharePartitionData) -> { - if (sharePartitionData.maxBytes > 0) shareFetchDataWithMaxBytes.put(tp, sharePartitionData); - }); // If the request's epoch is FINAL_EPOCH or INITIAL_EPOCH, we should remove the existing sessions. Also, start a // new session in case it is INITIAL_EPOCH. Hence, we need to treat them as special cases. if (reqMetadata.isFull()) { ShareSessionKey key = shareSessionKey(groupId, reqMetadata.memberId()); if (reqMetadata.epoch() == ShareRequestMetadata.FINAL_EPOCH) { - // If the epoch is FINAL_EPOCH, don't try to create a new session. - if (!shareFetchDataWithMaxBytes.isEmpty()) { - throw Errors.INVALID_REQUEST.exception(); - } Review Comment: Shouldn't we have a similar check i.e. if topic partitions are included in fetch request with final epoch? ########## server/src/main/java/org/apache/kafka/server/share/session/ShareSession.java: ########## @@ -110,25 +109,20 @@ public synchronized LastUsedKey lastUsedKey() { return new LastUsedKey(key, lastUsedMs); } - // Visible for testing - public synchronized long creationMs() { - return creationMs; - } Review Comment: Why we have removed the creationMs, is it not being used? ########## server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTestUtils.java: ########## @@ -30,34 +30,30 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.util.LinkedHashMap; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Set; import static org.apache.kafka.test.TestUtils.tempFile; import static org.junit.jupiter.api.Assertions.assertArrayEquals; -import static org.junit.jupiter.api.Assertions.assertEquals; /** * Helper functions for writing share fetch unit tests. */ public class ShareFetchTestUtils { /** - * Create an ordered map of TopicIdPartition to partition max bytes. + * Create an ArrayList of topic partitions. * - * @param partitionMaxBytes The maximum number of bytes that can be fetched for each partition. - * @param topicIdPartitions The topic partitions to create the map for. - * @return The ordered map of TopicIdPartition to partition max bytes. + * @param topicIdPartitions The topic partitions to create the list for. + * @return The list of topic partitions. */ - public static LinkedHashMap<TopicIdPartition, Integer> orderedMap(int partitionMaxBytes, TopicIdPartition... topicIdPartitions) { - LinkedHashMap<TopicIdPartition, Integer> map = new LinkedHashMap<>(); - for (TopicIdPartition tp : topicIdPartitions) { - map.put(tp, partitionMaxBytes); - } - return map; + public static ArrayList<TopicIdPartition> arrayList(TopicIdPartition... topicIdPartitions) { + ArrayList<TopicIdPartition> list = new ArrayList<>(); + Collections.addAll(list, topicIdPartitions); + return list; } Review Comment: Why not to use List.of()? Do we need mutable list? ########## clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java: ########## @@ -226,23 +189,18 @@ public int maxWait() { return data.maxWaitMs(); } - public Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchData(Map<Uuid, String> topicNames) { + public List<TopicIdPartition> shareFetchData(Map<Uuid, String> topicNames) { if (shareFetchData == null) { synchronized (this) { if (shareFetchData == null) { // Assigning the lazy-initialized `shareFetchData` in the last step // to avoid other threads accessing a half-initialized object. - final LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchDataTmp = new LinkedHashMap<>(); + final ArrayList<TopicIdPartition> shareFetchDataTmp = new ArrayList<>(); Review Comment: ```suggestion final List<TopicIdPartition> shareFetchDataTmp = new ArrayList<>(); ``` ########## core/src/test/java/kafka/server/share/DelayedShareFetchTest.java: ########## @@ -134,7 +132,7 @@ public void testDelayedShareFetchTryCompleteReturnsFalseDueToNonAcquirablePartit sharePartitions.put(tp1, sp1); ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), - new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS, + new CompletableFuture<>(), arrayList(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS, Review Comment: ```suggestion new CompletableFuture<>(), List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS, ``` ########## clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java: ########## @@ -151,7 +149,7 @@ public String toString() { } private final ShareFetchRequestData data; - private volatile LinkedHashMap<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchData = null; + private volatile ArrayList<TopicIdPartition> shareFetchData = null; Review Comment: Is there a reason to use ArrayList specifically and not declaring the `List` interface. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org