adixitconfluent commented on code in PR #17870: URL: https://github.com/apache/kafka/pull/17870#discussion_r1912173308
########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -390,18 +407,27 @@ private void handleFetchException( } // Visible for testing. - LinkedHashMap<TopicIdPartition, LogReadResult> combineLogReadResponse(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData, + LinkedHashMap<TopicIdPartition, LogReadResult> combineLogReadResponse(LinkedHashMap<TopicIdPartition, Long> topicPartitionData, LinkedHashMap<TopicIdPartition, LogReadResult> existingFetchedData) { - LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> missingLogReadTopicPartitions = new LinkedHashMap<>(); - topicPartitionData.forEach((topicIdPartition, partitionData) -> { + LinkedHashMap<TopicIdPartition, Long> missingLogReadTopicPartitions = new LinkedHashMap<>(); + topicPartitionData.forEach((topicIdPartition, fetchOffset) -> { if (!existingFetchedData.containsKey(topicIdPartition)) { - missingLogReadTopicPartitions.put(topicIdPartition, partitionData); + missingLogReadTopicPartitions.put(topicIdPartition, fetchOffset); } }); if (missingLogReadTopicPartitions.isEmpty()) { return existingFetchedData; } - LinkedHashMap<TopicIdPartition, LogReadResult> missingTopicPartitionsLogReadResponse = readFromLog(missingLogReadTopicPartitions); + + // Computing the total bytes that has already been fetched for the existing fetched data. + int totalPartitionMaxBytesUsed = 0; + for (LogReadResult logReadResult : existingFetchedData.values()) { + totalPartitionMaxBytesUsed += logReadResult.info().records.sizeInBytes(); + } + Review Comment: @apoorvmittal10, Yes, we can do that as well. But, I believe that this dynamic approach of computing `partitionMaxBytes` for leftover partitions if you know the bytes fetched for some partitions, is better blindly using the approach that we just divide it equally. Because, even if we set `partitionMaxBytes` of some partitions as 2MB, they may not have enough data produced in them so we might only fetch for example - 0.5 MB for those partitions. Now, using this dynamic approach, we give ourselves the opportunity to possibly fetch more data from the leftover partitions (which would be benefitial in the scenario if they have a heavier produce than the former partitions). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org