adixitconfluent commented on code in PR #17870:
URL: https://github.com/apache/kafka/pull/17870#discussion_r1909105571


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -207,20 +212,21 @@ LinkedHashMap<TopicIdPartition, 
FetchRequest.PartitionData> acquirablePartitions
         LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData = new LinkedHashMap<>();
 
         sharePartitions.forEach((topicIdPartition, sharePartition) -> {
-            int partitionMaxBytes = 
shareFetch.partitionMaxBytes().getOrDefault(topicIdPartition, 0);
             // Add the share partition to the list of partitions to be fetched 
only if we can
             // acquire the fetch lock on it.
             if (sharePartition.maybeAcquireFetchLock()) {
                 try {
                     // If the share partition is already at capacity, we 
should not attempt to fetch.
                     if (sharePartition.canAcquireRecords()) {
+                        // We do not know the total partitions that can be 
acquired at this stage, hence we set maxBytes
+                        // to 0 for now and will update it before doing the 
replica manager fetch.
                         topicPartitionData.put(
                             topicIdPartition,
                             new FetchRequest.PartitionData(
                                 topicIdPartition.topicId(),
                                 sharePartition.nextFetchOffset(),
                                 0,
-                                partitionMaxBytes,
+                                0,

Review Comment:
   I have made the code change to delay the creation of 
`FetchRequest.PartitionData`. Thanks.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -207,20 +212,21 @@ LinkedHashMap<TopicIdPartition, 
FetchRequest.PartitionData> acquirablePartitions
         LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData = new LinkedHashMap<>();
 
         sharePartitions.forEach((topicIdPartition, sharePartition) -> {
-            int partitionMaxBytes = 
shareFetch.partitionMaxBytes().getOrDefault(topicIdPartition, 0);
             // Add the share partition to the list of partitions to be fetched 
only if we can
             // acquire the fetch lock on it.
             if (sharePartition.maybeAcquireFetchLock()) {
                 try {
                     // If the share partition is already at capacity, we 
should not attempt to fetch.
                     if (sharePartition.canAcquireRecords()) {
+                        // We do not know the total partitions that can be 
acquired at this stage, hence we set maxBytes
+                        // to 0 for now and will update it before doing the 
replica manager fetch.
                         topicPartitionData.put(
                             topicIdPartition,
                             new FetchRequest.PartitionData(
                                 topicIdPartition.topicId(),
                                 sharePartition.nextFetchOffset(),
                                 0,
-                                partitionMaxBytes,
+                                0,

Review Comment:
   I have made the code change to delay the creation of 
FetchRequest.PartitionData. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to