apoorvmittal10 commented on code in PR #18804:
URL: https://github.com/apache/kafka/pull/18804#discussion_r1965415311


##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -187,4 +186,67 @@ static Partition partition(ReplicaManager replicaManager, 
TopicPartition tp) {
         }
         return partition;
     }
+
+    /**
+     * Slice the fetch records based on the acquired records. The slicing is 
done based on the first
+     * and last offset of the acquired records from the list. The slicing 
doesn't consider individual
+     * acquired batches rather the boundaries of the acquired list. The method 
expects the acquired
+     * records list to be within the fetch records bounds.
+     *
+     * @param records The records to be sliced.
+     * @param shareAcquiredRecords The share acquired records containing the 
non-empty acquired records.
+     * @return The sliced records, if the records are of type FileRecords and 
the acquired records are a subset
+     *        of the fetched records. Otherwise, the original records are 
returned.
+     */
+    static Records maybeSliceFetchRecords(Records records, 
ShareAcquiredRecords shareAcquiredRecords) {
+        if (!(records instanceof FileRecords fileRecords)) {
+            return records;
+        }
+        // The acquired records should be non-empty, do not check as the 
method is called only when the
+        // acquired records are non-empty.
+        List<AcquiredRecords> acquiredRecords = 
shareAcquiredRecords.acquiredRecords();
+        try {
+            final Iterator<FileChannelRecordBatch> iterator = 
fileRecords.batchIterator();
+            // Track the first overlapping batch with the first acquired 
offset.
+            FileChannelRecordBatch firstOverlapBatch = iterator.next();
+            // If there exists single fetch batch, then return the original 
records.
+            if (!iterator.hasNext()) {
+                return records;
+            }
+            // Find the first and last acquired offset to slice the records.
+            final long firstAcquiredOffset = 
acquiredRecords.get(0).firstOffset();
+            final long lastAcquiredOffset = 
acquiredRecords.get(acquiredRecords.size() - 1).lastOffset();
+            int startPosition = 0;
+            int size = 0;
+            while (iterator.hasNext()) {
+                FileChannelRecordBatch batch = iterator.next();
+                // Iterate until finds the first overlap batch with the first 
acquired offset. All the
+                // batches before this first overlap batch should be sliced 
hence increment the start
+                // position.
+                if (firstOverlapBatch.baseOffset() < firstAcquiredOffset && 
batch.baseOffset() <= firstAcquiredOffset) {

Review Comment:
   Yeah, my bad. I was doing a lot of refactoring and missed this. You are 
right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to