adixitconfluent commented on code in PR #18696: URL: https://github.com/apache/kafka/pull/18696#discussion_r1935001816
########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -648,6 +678,29 @@ public ShareAcquiredRecords acquire( } InFlightBatch inFlightBatch = entry.getValue(); + + // If baseOffset is less than the key of the entry, this means the fetch happened for a gap in the cachedState. + // Thus, a new batch needs to be acquired for the gap. + if (baseOffset < entry.getKey()) { + // This is to check whether the fetched records are all part of the gap, or they overlap with the next + // inFlight batch in the cachedState + if (lastBatch.lastOffset() < (entry.getKey())) { + // The entire request batch is part of the gap + return acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(), + baseOffset, lastBatch.lastOffset(), batchSize, maxFetchRecords); + } else { + result.add(new AcquiredRecords() + .setFirstOffset(baseOffset) + .setLastOffset(entry.getKey() - 1) + .setDeliveryCount((short) 1)); + acquiredCount += (int) (entry.getKey() - baseOffset); + acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(), + baseOffset, entry.getKey() - 1, batchSize, maxFetchRecords); + baseOffset = inFlightBatch.lastOffset() + 1; + continue; + } + } + baseOffset = inFlightBatch.lastOffset() + 1; Review Comment: can we also add some comments around why we need to make this change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org