apoorvmittal10 commented on code in PR #19148:
URL: https://github.com/apache/kafka/pull/19148#discussion_r1986418726


##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -258,14 +257,14 @@ public CompletableFuture<Map<TopicIdPartition, 
PartitionData>> fetchMessages(
         FetchParams fetchParams,
         int sessionEpoch,
         int batchSize,
-        LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes
+        ArrayList<TopicIdPartition> topicPartitions

Review Comment:
   ```suggestion
           List<TopicIdPartition> topicIdPartitions
   ```



##########
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##########
@@ -526,7 +518,7 @@ public void testForceCompleteTriggersDelayedActionsQueue() {
         TopicIdPartition tp0 = new TopicIdPartition(topicId, new 
TopicPartition("foo", 0));
         TopicIdPartition tp1 = new TopicIdPartition(topicId, new 
TopicPartition("foo", 1));
         TopicIdPartition tp2 = new TopicIdPartition(topicId, new 
TopicPartition("foo", 2));
-        LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes1 = 
orderedMap(PARTITION_MAX_BYTES, tp0, tp1);
+        ArrayList<TopicIdPartition> topicIdPartitions1 = arrayList(tp0, tp1);

Review Comment:
   ```suggestion
           List<TopicIdPartition> topicIdPartitions1 = List.of(tp0, tp1);
   ```



##########
server/src/main/java/org/apache/kafka/server/share/fetch/PartitionRotateStrategy.java:
##########
@@ -48,7 +47,7 @@ public String toString() {
      *
      * @return the rotated topicIdPartitions
      */
-    LinkedHashMap<TopicIdPartition, Integer> 
rotate(LinkedHashMap<TopicIdPartition, Integer> topicIdPartitions, 
PartitionRotateMetadata metadata);
+    ArrayList<TopicIdPartition> rotate(ArrayList<TopicIdPartition> 
topicIdPartitions, PartitionRotateMetadata metadata);

Review Comment:
   ```suggestion
       List<TopicIdPartition> rotate(List<TopicIdPartition> topicIdPartitions, 
PartitionRotateMetadata metadata);
   ```



##########
clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java:
##########
@@ -151,7 +149,7 @@ public String toString() {
     }
 
     private final ShareFetchRequestData data;
-    private volatile LinkedHashMap<TopicIdPartition, 
ShareFetchRequest.SharePartitionData> shareFetchData = null;
+    private volatile ArrayList<TopicIdPartition> shareFetchData = null;

Review Comment:
   ```suggestion
       private volatile List<TopicIdPartition> shareFetchData = null;
   ```



##########
server/src/main/java/org/apache/kafka/server/share/fetch/PartitionRotateStrategy.java:
##########
@@ -64,8 +63,8 @@ static PartitionRotateStrategy type(StrategyType type) {
      *
      * @return the rotated topicIdPartitions
      */
-    static LinkedHashMap<TopicIdPartition, Integer> rotateRoundRobin(
-        LinkedHashMap<TopicIdPartition, Integer> topicIdPartitions,
+    static ArrayList<TopicIdPartition> rotateRoundRobin(

Review Comment:
   ```suggestion
       static List<TopicIdPartition> rotateRoundRobin(
   ```



##########
server/src/main/java/org/apache/kafka/server/share/session/ShareSession.java:
##########
@@ -110,25 +109,20 @@ public synchronized LastUsedKey lastUsedKey() {
         return new LastUsedKey(key, lastUsedMs);
     }
 
-    // Visible for testing
-    public synchronized long creationMs() {
-        return creationMs;
-    }
-
     // Update the cached partition data based on the request.
-    public synchronized Map<ModifiedTopicIdPartitionType, 
List<TopicIdPartition>> update(Map<TopicIdPartition,
-            ShareFetchRequest.SharePartitionData> shareFetchData, 
List<TopicIdPartition> toForget) {
+    public synchronized Map<ModifiedTopicIdPartitionType, 
List<TopicIdPartition>> update(
+        List<TopicIdPartition> shareFetchData,
+        List<TopicIdPartition> toForget) {

Review Comment:
   ```suggestion
           List<TopicIdPartition> toForget
     ) {
   ```



##########
server/src/main/java/org/apache/kafka/server/share/fetch/PartitionRotateStrategy.java:
##########
@@ -80,20 +79,18 @@ static LinkedHashMap<TopicIdPartition, Integer> 
rotateRoundRobin(
             return topicIdPartitions;
         }
 
-        // TODO: Once the partition max bytes is removed then the partition 
will be a linked list and rotation
-        //  will be a simple operation. Else consider using 
ImplicitLinkedHashCollection.
-        LinkedHashMap<TopicIdPartition, Integer> suffixPartitions = new 
LinkedHashMap<>(rotateAt);
-        LinkedHashMap<TopicIdPartition, Integer> rotatedPartitions = new 
LinkedHashMap<>(topicIdPartitions.size());
+        ArrayList<TopicIdPartition> suffixPartitions = new 
ArrayList<>(rotateAt);
+        ArrayList<TopicIdPartition> rotatedPartitions = new 
ArrayList<>(topicIdPartitions.size());
         int i = 0;
-        for (Map.Entry<TopicIdPartition, Integer> entry : 
topicIdPartitions.entrySet()) {
+        for (TopicIdPartition topicIdPartition : topicIdPartitions) {
             if (i < rotateAt) {
-                suffixPartitions.put(entry.getKey(), entry.getValue());
+                suffixPartitions.add(topicIdPartition);
             } else {
-                rotatedPartitions.put(entry.getKey(), entry.getValue());
+                rotatedPartitions.add(topicIdPartition);
             }
             i++;
         }
-        rotatedPartitions.putAll(suffixPartitions);
+        rotatedPartitions.addAll(suffixPartitions);

Review Comment:
   Should we use `Collections.rotate` instead?



##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -427,29 +426,20 @@ private CompletableFuture<Map<TopicIdPartition, 
ShareAcknowledgeResponseData.Par
     /**
      * The newContext method is used to create a new share fetch context for 
every share fetch request.
      * @param groupId The group id in the share fetch request.
-     * @param shareFetchData The topic-partitions and their corresponding 
maxBytes data in the share fetch request.
+     * @param shareFetchData The topic-partitions in the share fetch request.
      * @param toForget The topic-partitions to forget present in the share 
fetch request.
      * @param reqMetadata The metadata in the share fetch request.
      * @param isAcknowledgeDataPresent This tells whether the fetch request 
received includes piggybacked acknowledgements or not
      * @return The new share fetch context object
      */
-    public ShareFetchContext newContext(String groupId, Map<TopicIdPartition, 
ShareFetchRequest.SharePartitionData> shareFetchData,
+    public ShareFetchContext newContext(String groupId, List<TopicIdPartition> 
shareFetchData,
                                         List<TopicIdPartition> toForget, 
ShareRequestMetadata reqMetadata, Boolean isAcknowledgeDataPresent) {
         ShareFetchContext context;
-        // TopicPartition with maxBytes as 0 should not be added in the 
cachedPartitions
-        Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> 
shareFetchDataWithMaxBytes = new HashMap<>();
-        shareFetchData.forEach((tp, sharePartitionData) -> {
-            if (sharePartitionData.maxBytes > 0) 
shareFetchDataWithMaxBytes.put(tp, sharePartitionData);
-        });
         // If the request's epoch is FINAL_EPOCH or INITIAL_EPOCH, we should 
remove the existing sessions. Also, start a
         // new session in case it is INITIAL_EPOCH. Hence, we need to treat 
them as special cases.
         if (reqMetadata.isFull()) {
             ShareSessionKey key = shareSessionKey(groupId, 
reqMetadata.memberId());
             if (reqMetadata.epoch() == ShareRequestMetadata.FINAL_EPOCH) {
-                // If the epoch is FINAL_EPOCH, don't try to create a new 
session.
-                if (!shareFetchDataWithMaxBytes.isEmpty()) {
-                    throw Errors.INVALID_REQUEST.exception();
-                }

Review Comment:
   Shouldn't we have a similar check i.e. if topic partitions are included in 
fetch request with final epoch?



##########
server/src/main/java/org/apache/kafka/server/share/session/ShareSession.java:
##########
@@ -110,25 +109,20 @@ public synchronized LastUsedKey lastUsedKey() {
         return new LastUsedKey(key, lastUsedMs);
     }
 
-    // Visible for testing
-    public synchronized long creationMs() {
-        return creationMs;
-    }

Review Comment:
   Why we have removed the creationMs, is it not being used?



##########
server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTestUtils.java:
##########
@@ -30,34 +30,30 @@
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.LinkedHashMap;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
 
 import static org.apache.kafka.test.TestUtils.tempFile;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /**
  * Helper functions for writing share fetch unit tests.
  */
 public class ShareFetchTestUtils {
 
     /**
-     * Create an ordered map of TopicIdPartition to partition max bytes.
+     * Create an ArrayList of topic partitions.
      *
-     * @param partitionMaxBytes The maximum number of bytes that can be 
fetched for each partition.
-     * @param topicIdPartitions The topic partitions to create the map for.
-     * @return The ordered map of TopicIdPartition to partition max bytes.
+     * @param topicIdPartitions The topic partitions to create the list for.
+     * @return The list of topic partitions.
      */
-    public static LinkedHashMap<TopicIdPartition, Integer> orderedMap(int 
partitionMaxBytes, TopicIdPartition... topicIdPartitions) {
-        LinkedHashMap<TopicIdPartition, Integer> map = new LinkedHashMap<>();
-        for (TopicIdPartition tp : topicIdPartitions) {
-            map.put(tp, partitionMaxBytes);
-        }
-        return map;
+    public static ArrayList<TopicIdPartition> arrayList(TopicIdPartition... 
topicIdPartitions) {
+        ArrayList<TopicIdPartition> list = new ArrayList<>();
+        Collections.addAll(list, topicIdPartitions);
+        return list;
     }

Review Comment:
   Why not to use List.of()? Do we need mutable list?



##########
clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java:
##########
@@ -226,23 +189,18 @@ public int maxWait() {
         return data.maxWaitMs();
     }
 
-    public Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> 
shareFetchData(Map<Uuid, String> topicNames) {
+    public List<TopicIdPartition> shareFetchData(Map<Uuid, String> topicNames) 
{
         if (shareFetchData == null) {
             synchronized (this) {
                 if (shareFetchData == null) {
                     // Assigning the lazy-initialized `shareFetchData` in the 
last step
                     // to avoid other threads accessing a half-initialized 
object.
-                    final LinkedHashMap<TopicIdPartition, 
ShareFetchRequest.SharePartitionData> shareFetchDataTmp = new LinkedHashMap<>();
+                    final ArrayList<TopicIdPartition> shareFetchDataTmp = new 
ArrayList<>();

Review Comment:
   ```suggestion
                       final List<TopicIdPartition> shareFetchDataTmp = new 
ArrayList<>();
   ```



##########
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##########
@@ -134,7 +132,7 @@ public void 
testDelayedShareFetchTryCompleteReturnsFalseDueToNonAcquirablePartit
         sharePartitions.put(tp1, sp1);
 
         ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, 
Uuid.randomUuid().toString(),
-            new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, 
MAX_FETCH_RECORDS,
+            new CompletableFuture<>(), arrayList(tp0, tp1), BATCH_SIZE, 
MAX_FETCH_RECORDS,

Review Comment:
   ```suggestion
               new CompletableFuture<>(), List.of(tp0, tp1), BATCH_SIZE, 
MAX_FETCH_RECORDS,
   ```



##########
clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java:
##########
@@ -151,7 +149,7 @@ public String toString() {
     }
 
     private final ShareFetchRequestData data;
-    private volatile LinkedHashMap<TopicIdPartition, 
ShareFetchRequest.SharePartitionData> shareFetchData = null;
+    private volatile ArrayList<TopicIdPartition> shareFetchData = null;

Review Comment:
   Is there a reason to use ArrayList specifically and not declaring the `List` 
interface.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to