adixitconfluent commented on code in PR #18696:
URL: https://github.com/apache/kafka/pull/18696#discussion_r1935001816


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -648,6 +678,29 @@ public ShareAcquiredRecords acquire(
                 }
 
                 InFlightBatch inFlightBatch = entry.getValue();
+
+                // If baseOffset is less than the key of the entry, this means 
the fetch happened for a gap in the cachedState.
+                // Thus, a new batch needs to be acquired for the gap.
+                if (baseOffset < entry.getKey()) {
+                    // This is to check whether the fetched records are all 
part of the gap, or they overlap with the next
+                    // inFlight batch in the cachedState
+                    if (lastBatch.lastOffset() < (entry.getKey())) {
+                        // The entire request batch is part of the gap
+                        return acquireNewBatchRecords(memberId, 
fetchPartitionData.records.batches(),
+                            baseOffset, lastBatch.lastOffset(), batchSize, 
maxFetchRecords);
+                    } else {
+                        result.add(new AcquiredRecords()
+                            .setFirstOffset(baseOffset)
+                            .setLastOffset(entry.getKey() - 1)
+                            .setDeliveryCount((short) 1));
+                        acquiredCount += (int) (entry.getKey() - baseOffset);
+                        acquireNewBatchRecords(memberId, 
fetchPartitionData.records.batches(),
+                            baseOffset, entry.getKey() - 1, batchSize, 
maxFetchRecords);
+                        baseOffset = inFlightBatch.lastOffset() + 1;
+                        continue;
+                    }
+                }
+                baseOffset = inFlightBatch.lastOffset() + 1;

Review Comment:
   can we also add some comments around why we need to make this change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to