adixitconfluent commented on code in PR #17870:
URL: https://github.com/apache/kafka/pull/17870#discussion_r1912187645


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -390,18 +407,27 @@ private void handleFetchException(
     }
 
     // Visible for testing.
-    LinkedHashMap<TopicIdPartition, LogReadResult> 
combineLogReadResponse(LinkedHashMap<TopicIdPartition, 
FetchRequest.PartitionData> topicPartitionData,
+    LinkedHashMap<TopicIdPartition, LogReadResult> 
combineLogReadResponse(LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
                                                                 
LinkedHashMap<TopicIdPartition, LogReadResult> existingFetchedData) {
-        LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> 
missingLogReadTopicPartitions = new LinkedHashMap<>();
-        topicPartitionData.forEach((topicIdPartition, partitionData) -> {
+        LinkedHashMap<TopicIdPartition, Long> missingLogReadTopicPartitions = 
new LinkedHashMap<>();
+        topicPartitionData.forEach((topicIdPartition, fetchOffset) -> {
             if (!existingFetchedData.containsKey(topicIdPartition)) {
-                missingLogReadTopicPartitions.put(topicIdPartition, 
partitionData);
+                missingLogReadTopicPartitions.put(topicIdPartition, 
fetchOffset);
             }
         });
         if (missingLogReadTopicPartitions.isEmpty()) {
             return existingFetchedData;
         }
-        LinkedHashMap<TopicIdPartition, LogReadResult> 
missingTopicPartitionsLogReadResponse = 
readFromLog(missingLogReadTopicPartitions);
+
+        // Computing the total bytes that has already been fetched for the 
existing fetched data.
+        int totalPartitionMaxBytesUsed = 0;
+        for (LogReadResult logReadResult : existingFetchedData.values()) {
+            totalPartitionMaxBytesUsed += 
logReadResult.info().records.sizeInBytes();
+        }
+

Review Comment:
   yeah, this is a slight better version of UNIFORM strategy. The other 
strategies might already depend on produce volume, so it may already be handled 
over there and this code might not be useful. Okay, I'll change it to a basic 
version where partitionMaxBytes for leftover partitions will solely depend upon 
acquired topic partitions size.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -390,18 +407,27 @@ private void handleFetchException(
     }
 
     // Visible for testing.
-    LinkedHashMap<TopicIdPartition, LogReadResult> 
combineLogReadResponse(LinkedHashMap<TopicIdPartition, 
FetchRequest.PartitionData> topicPartitionData,
+    LinkedHashMap<TopicIdPartition, LogReadResult> 
combineLogReadResponse(LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
                                                                 
LinkedHashMap<TopicIdPartition, LogReadResult> existingFetchedData) {
-        LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> 
missingLogReadTopicPartitions = new LinkedHashMap<>();
-        topicPartitionData.forEach((topicIdPartition, partitionData) -> {
+        LinkedHashMap<TopicIdPartition, Long> missingLogReadTopicPartitions = 
new LinkedHashMap<>();
+        topicPartitionData.forEach((topicIdPartition, fetchOffset) -> {
             if (!existingFetchedData.containsKey(topicIdPartition)) {
-                missingLogReadTopicPartitions.put(topicIdPartition, 
partitionData);
+                missingLogReadTopicPartitions.put(topicIdPartition, 
fetchOffset);
             }
         });
         if (missingLogReadTopicPartitions.isEmpty()) {
             return existingFetchedData;
         }
-        LinkedHashMap<TopicIdPartition, LogReadResult> 
missingTopicPartitionsLogReadResponse = 
readFromLog(missingLogReadTopicPartitions);
+
+        // Computing the total bytes that has already been fetched for the 
existing fetched data.
+        int totalPartitionMaxBytesUsed = 0;
+        for (LogReadResult logReadResult : existingFetchedData.values()) {
+            totalPartitionMaxBytesUsed += 
logReadResult.info().records.sizeInBytes();
+        }
+

Review Comment:
   yeah, this is a slight better version of UNIFORM strategy. The other 
strategies might already depend on produce volume, so it may already be handled 
over there and this code might not be useful. Okay, I'll change it to a basic 
version where `partitionMaxBytes` for leftover partitions will solely depend 
upon acquired topic partitions size.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to