apoorvmittal10 commented on code in PR #18804: URL: https://github.com/apache/kafka/pull/18804#discussion_r1965415311
########## core/src/main/java/kafka/server/share/ShareFetchUtils.java: ########## @@ -187,4 +186,67 @@ static Partition partition(ReplicaManager replicaManager, TopicPartition tp) { } return partition; } + + /** + * Slice the fetch records based on the acquired records. The slicing is done based on the first + * and last offset of the acquired records from the list. The slicing doesn't consider individual + * acquired batches rather the boundaries of the acquired list. The method expects the acquired + * records list to be within the fetch records bounds. + * + * @param records The records to be sliced. + * @param shareAcquiredRecords The share acquired records containing the non-empty acquired records. + * @return The sliced records, if the records are of type FileRecords and the acquired records are a subset + * of the fetched records. Otherwise, the original records are returned. + */ + static Records maybeSliceFetchRecords(Records records, ShareAcquiredRecords shareAcquiredRecords) { + if (!(records instanceof FileRecords fileRecords)) { + return records; + } + // The acquired records should be non-empty, do not check as the method is called only when the + // acquired records are non-empty. + List<AcquiredRecords> acquiredRecords = shareAcquiredRecords.acquiredRecords(); + try { + final Iterator<FileChannelRecordBatch> iterator = fileRecords.batchIterator(); + // Track the first overlapping batch with the first acquired offset. + FileChannelRecordBatch firstOverlapBatch = iterator.next(); + // If there exists single fetch batch, then return the original records. + if (!iterator.hasNext()) { + return records; + } + // Find the first and last acquired offset to slice the records. + final long firstAcquiredOffset = acquiredRecords.get(0).firstOffset(); + final long lastAcquiredOffset = acquiredRecords.get(acquiredRecords.size() - 1).lastOffset(); + int startPosition = 0; + int size = 0; + while (iterator.hasNext()) { + FileChannelRecordBatch batch = iterator.next(); + // Iterate until finds the first overlap batch with the first acquired offset. All the + // batches before this first overlap batch should be sliced hence increment the start + // position. + if (firstOverlapBatch.baseOffset() < firstAcquiredOffset && batch.baseOffset() <= firstAcquiredOffset) { Review Comment: Yeah, my bad. I was doing a lot of refactoring and missed this. You are right. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org