xintongsong commented on code in PR #23255:
URL: https://github.com/apache/flink/pull/23255#discussion_r1319297747


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java:
##########
@@ -78,4 +87,79 @@ long getPriority(
 
     /** Release the {@link PartitionFileReader}. */
     void release();
+
+    /** A {@link PartialBuffer} is a part slice of a larger buffer. */
+    class PartialBuffer extends CompositeBuffer {
+
+        public PartialBuffer(BufferHeader bufferHeader) {
+            super(bufferHeader);
+        }
+    }
+
+    /**
+     * A wrapper class of the reading buffer result, including the read 
buffers, the hint of
+     * continue reading, and the read progress, etc.
+     */
+    class ReadBufferResult {
+
+        /** The read buffers. */
+        private final List<Buffer> readBuffers;
+
+        /**
+         * A hint to determine whether the caller should continue reading the 
following buffers.
+         * Note that this hint is merely a recommendation and not obligatory. 
Following the hint
+         * while reading buffers may improve performance.
+         */
+        private final boolean shouldContinueReadHint;

Review Comment:
   ```suggestion
           private final boolean continuousReadSuggested;
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java:
##########
@@ -78,4 +87,79 @@ long getPriority(
 
     /** Release the {@link PartitionFileReader}. */
     void release();
+
+    /** A {@link PartialBuffer} is a part slice of a larger buffer. */
+    class PartialBuffer extends CompositeBuffer {
+
+        public PartialBuffer(BufferHeader bufferHeader) {
+            super(bufferHeader);
+        }
+    }

Review Comment:
   Why not use `CompositeBuffer` directly?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler.java:
##########
@@ -396,13 +417,46 @@ private void prepareForScheduling() {
             if (nextSegmentId < 0) {
                 updateSegmentId();
             }
+            if (nextSegmentId < 0) {
+                priority = Long.MAX_VALUE;
+                return;
+            }
             priority =
-                    nextSegmentId < 0
-                            ? Long.MAX_VALUE
+                    readProgress != null
+                            ? readProgress.getCurrentReadOffset()

Review Comment:
   This implies the io scheduler understands that priority is file offset.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java:
##########
@@ -78,4 +87,79 @@ long getPriority(
 
     /** Release the {@link PartitionFileReader}. */
     void release();
+
+    /** A {@link PartialBuffer} is a part slice of a larger buffer. */
+    class PartialBuffer extends CompositeBuffer {
+
+        public PartialBuffer(BufferHeader bufferHeader) {
+            super(bufferHeader);
+        }
+    }
+
+    /**
+     * A wrapper class of the reading buffer result, including the read 
buffers, the hint of
+     * continue reading, and the read progress, etc.
+     */
+    class ReadBufferResult {
+
+        /** The read buffers. */
+        private final List<Buffer> readBuffers;
+
+        /**
+         * A hint to determine whether the caller should continue reading the 
following buffers.
+         * Note that this hint is merely a recommendation and not obligatory. 
Following the hint
+         * while reading buffers may improve performance.
+         */
+        private final boolean shouldContinueReadHint;
+
+        /** The read progress state. */
+        private final ReadProgress readProgress;
+
+        public ReadBufferResult(
+                List<Buffer> readBuffers,
+                boolean shouldContinueReadHint,
+                ReadProgress readProgress) {
+            this.readBuffers = readBuffers;
+            this.shouldContinueReadHint = shouldContinueReadHint;
+            this.readProgress = readProgress;
+        }
+
+        public List<Buffer> getReadBuffers() {
+            return readBuffers;
+        }
+
+        public boolean shouldContinueReadHint() {
+            return shouldContinueReadHint;
+        }
+
+        public ReadProgress getReadProgress() {
+            return readProgress;
+        }
+    }
+
+    /** The {@link ReadProgress} mainly includes current reading offset, end 
of read offset, etc. */
+    class ReadProgress {
+
+        /**
+         * The current read file offset. Note the offset does not contain the 
length of the partial
+         * buffer, because the partial buffer may be dropped at anytime.
+         */
+        private final long currentReadOffset;

Review Comment:
   ```suggestion
           private final long currentBufferOffset;
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java:
##########
@@ -78,4 +87,79 @@ long getPriority(
 
     /** Release the {@link PartitionFileReader}. */
     void release();
+
+    /** A {@link PartialBuffer} is a part slice of a larger buffer. */
+    class PartialBuffer extends CompositeBuffer {
+
+        public PartialBuffer(BufferHeader bufferHeader) {
+            super(bufferHeader);
+        }
+    }
+
+    /**
+     * A wrapper class of the reading buffer result, including the read 
buffers, the hint of
+     * continue reading, and the read progress, etc.
+     */
+    class ReadBufferResult {
+
+        /** The read buffers. */
+        private final List<Buffer> readBuffers;
+
+        /**
+         * A hint to determine whether the caller should continue reading the 
following buffers.
+         * Note that this hint is merely a recommendation and not obligatory. 
Following the hint
+         * while reading buffers may improve performance.
+         */
+        private final boolean shouldContinueReadHint;
+
+        /** The read progress state. */
+        private final ReadProgress readProgress;
+
+        public ReadBufferResult(
+                List<Buffer> readBuffers,
+                boolean shouldContinueReadHint,
+                ReadProgress readProgress) {
+            this.readBuffers = readBuffers;
+            this.shouldContinueReadHint = shouldContinueReadHint;
+            this.readProgress = readProgress;
+        }
+
+        public List<Buffer> getReadBuffers() {
+            return readBuffers;
+        }
+
+        public boolean shouldContinueReadHint() {
+            return shouldContinueReadHint;
+        }
+
+        public ReadProgress getReadProgress() {
+            return readProgress;
+        }
+    }
+
+    /** The {@link ReadProgress} mainly includes current reading offset, end 
of read offset, etc. */
+    class ReadProgress {

Review Comment:
   I think this should be an interface, so that:
   1. The caller of the reader won't be aware of the progress information.
   2. We can provide different implementations for different partition file 
readers.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler.java:
##########
@@ -354,34 +357,52 @@ private void loadDiskDataToBuffers(Queue<MemorySegment> 
buffers, BufferRecycler
                                 + subpartitionId
                                 + " has already been failed.");
             }
-            while (!buffers.isEmpty()
-                    && nettyConnectionWriter.numQueuedBufferPayloads() < 
maxBufferReadAhead
-                    && nextSegmentId >= 0) {
-                MemorySegment memorySegment = buffers.poll();
-                Buffer buffer;
-                try {
-                    if ((buffer =
-                                    partitionFileReader.readBuffer(
-                                            partitionId,
-                                            subpartitionId,
-                                            nextSegmentId,
-                                            nextBufferIndex,
-                                            memorySegment,
-                                            recycler))
-                            == null) {
+
+            PartitionFileReader.PartialBuffer partialBuffer = null;
+            boolean shouldContinueRead = true;
+            try {
+                while (!buffers.isEmpty() && shouldContinueRead && 
nextSegmentId >= 0) {
+                    MemorySegment memorySegment = buffers.poll();
+                    PartitionFileReader.ReadBufferResult readBufferResult;
+                    try {
+                        readBufferResult =
+                                partitionFileReader.readBuffer(
+                                        partitionId,
+                                        subpartitionId,
+                                        nextSegmentId,
+                                        nextBufferIndex,
+                                        memorySegment,
+                                        recycler,
+                                        readProgress,
+                                        partialBuffer);
+                        if (readBufferResult == null) {
+                            buffers.add(memorySegment);
+                            break;
+                        }
+                    } catch (Throwable throwable) {
+                        buffers.add(memorySegment);
+                        throw throwable;
+                    }
+
+                    List<Buffer> readBuffers = 
readBufferResult.getReadBuffers();
+                    shouldContinueRead = 
readBufferResult.shouldContinueReadHint();
+                    readProgress = readBufferResult.getReadProgress();
+                    if (!shouldContinueRead) {
+                        checkState(
+                                readProgress.getCurrentReadOffset()
+                                        == readProgress.getEndOfReadOffset());
+                        readProgress = null;

Review Comment:
   This implies the io scheduler understands the relationship between 
`shouldContinueRead` and `readProgress`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndex.java:
##########
@@ -257,20 +266,28 @@ public FixedSizeRegion readRegionFromFile(FileChannel 
channel, long fileOffset)
      */
     public static class FixedSizeRegion implements 
FileDataIndexRegionHelper.Region {
 
-        public static final int REGION_SIZE = Integer.BYTES + Long.BYTES + 
Integer.BYTES;
+        public static final int REGION_SIZE =
+                Integer.BYTES + Long.BYTES + Integer.BYTES + Long.BYTES;
 
         /** The buffer index of first buffer. */
         private final int firstBufferIndex;
 
         /** The file offset of the region. */
         private final long regionFileOffset;
 
+        private final long regionFileEndOffset;

Review Comment:
   And `regionFileOffset` should become `regionBeginOffset`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java:
##########
@@ -78,4 +87,79 @@ long getPriority(
 
     /** Release the {@link PartitionFileReader}. */
     void release();
+
+    /** A {@link PartialBuffer} is a part slice of a larger buffer. */
+    class PartialBuffer extends CompositeBuffer {
+
+        public PartialBuffer(BufferHeader bufferHeader) {
+            super(bufferHeader);
+        }
+    }
+
+    /**
+     * A wrapper class of the reading buffer result, including the read 
buffers, the hint of
+     * continue reading, and the read progress, etc.
+     */
+    class ReadBufferResult {
+
+        /** The read buffers. */
+        private final List<Buffer> readBuffers;
+
+        /**
+         * A hint to determine whether the caller should continue reading the 
following buffers.
+         * Note that this hint is merely a recommendation and not obligatory. 
Following the hint
+         * while reading buffers may improve performance.
+         */
+        private final boolean shouldContinueReadHint;
+
+        /** The read progress state. */
+        private final ReadProgress readProgress;
+
+        public ReadBufferResult(
+                List<Buffer> readBuffers,
+                boolean shouldContinueReadHint,
+                ReadProgress readProgress) {
+            this.readBuffers = readBuffers;
+            this.shouldContinueReadHint = shouldContinueReadHint;
+            this.readProgress = readProgress;
+        }
+
+        public List<Buffer> getReadBuffers() {
+            return readBuffers;
+        }
+
+        public boolean shouldContinueReadHint() {
+            return shouldContinueReadHint;
+        }
+
+        public ReadProgress getReadProgress() {
+            return readProgress;
+        }
+    }
+
+    /** The {@link ReadProgress} mainly includes current reading offset, end 
of read offset, etc. */
+    class ReadProgress {
+
+        /**
+         * The current read file offset. Note the offset does not contain the 
length of the partial
+         * buffer, because the partial buffer may be dropped at anytime.
+         */
+        private final long currentReadOffset;
+
+        /** The end of read file offset. */
+        private final long endOfReadOffset;

Review Comment:
   ```suggestion
           private final long endOfRegionOffset;
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java:
##########
@@ -176,91 +162,179 @@ private void lazyInitializeFileChannel() {
     }
 
     /**
-     * Try to get the cache according to the key.
+     * Slice the read memory segment to multiple small network buffers.
      *
-     * <p>If the relevant buffer offset cache exists, it will be returned and 
subsequently removed.
-     * However, if the buffer offset cache does not exist, a new cache will be 
created using the
-     * data index and returned.
+     * <p>Note that although the method appears to be split into multiple 
buffers, the sliced
+     * buffers still share the same one actual underlying memory segment.
      *
-     * @param cacheKey the key of cache.
-     * @param removeKey boolean decides whether to remove key.
-     * @return returns the relevant buffer offset cache if it exists, 
otherwise return {@link
-     *     Optional#empty()}.
+     * @param byteBuffer the byte buffer to be sliced, it points to the 
underlying memorySegment
+     * @param memorySegment the underlying memory segment to be sliced
+     * @param partialBuffer the partial buffer, if the partial buffer is not 
null, it contains the
+     *     partial data buffer from the previous read
+     * @param readBuffers the read buffers list is to accept the sliced buffers
+     * @return the first field is the partial data buffer, the second field is 
the number of sliced
+     *     bytes.
      */
-    private Optional<BufferOffsetCache> tryGetCache(
-            Tuple2<TieredStorageSubpartitionId, Integer> cacheKey, boolean 
removeKey) {
-        BufferOffsetCache bufferOffsetCache = 
bufferOffsetCaches.remove(cacheKey);
-        if (bufferOffsetCache == null) {
-            Optional<ProducerMergedPartitionFileIndex.FixedSizeRegion> 
regionOpt =
-                    dataIndex.getRegion(cacheKey.f0, cacheKey.f1);
-            return regionOpt.map(region -> new BufferOffsetCache(cacheKey.f1, 
region));
-        } else {
-            if (removeKey) {
-                numCaches--;
-            } else {
-                bufferOffsetCaches.put(cacheKey, bufferOffsetCache);
+    private Tuple2<PartialBuffer, Integer> sliceBuffer(

Review Comment:
   Why do we need `PartialBuffer` in the return value? I think we only need the 
total and partial length.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndex.java:
##########
@@ -257,20 +266,28 @@ public FixedSizeRegion readRegionFromFile(FileChannel 
channel, long fileOffset)
      */
     public static class FixedSizeRegion implements 
FileDataIndexRegionHelper.Region {
 
-        public static final int REGION_SIZE = Integer.BYTES + Long.BYTES + 
Integer.BYTES;
+        public static final int REGION_SIZE =
+                Integer.BYTES + Long.BYTES + Integer.BYTES + Long.BYTES;
 
         /** The buffer index of first buffer. */
         private final int firstBufferIndex;
 
         /** The file offset of the region. */
         private final long regionFileOffset;
 
+        private final long regionFileEndOffset;

Review Comment:
   ```suggestion
           private final long regionEndOffset;
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java:
##########
@@ -176,91 +162,179 @@ private void lazyInitializeFileChannel() {
     }
 
     /**
-     * Try to get the cache according to the key.
+     * Slice the read memory segment to multiple small network buffers.
      *
-     * <p>If the relevant buffer offset cache exists, it will be returned and 
subsequently removed.
-     * However, if the buffer offset cache does not exist, a new cache will be 
created using the
-     * data index and returned.
+     * <p>Note that although the method appears to be split into multiple 
buffers, the sliced
+     * buffers still share the same one actual underlying memory segment.
      *
-     * @param cacheKey the key of cache.
-     * @param removeKey boolean decides whether to remove key.
-     * @return returns the relevant buffer offset cache if it exists, 
otherwise return {@link
-     *     Optional#empty()}.
+     * @param byteBuffer the byte buffer to be sliced, it points to the 
underlying memorySegment
+     * @param memorySegment the underlying memory segment to be sliced
+     * @param partialBuffer the partial buffer, if the partial buffer is not 
null, it contains the
+     *     partial data buffer from the previous read
+     * @param readBuffers the read buffers list is to accept the sliced buffers
+     * @return the first field is the partial data buffer, the second field is 
the number of sliced
+     *     bytes.
      */
-    private Optional<BufferOffsetCache> tryGetCache(
-            Tuple2<TieredStorageSubpartitionId, Integer> cacheKey, boolean 
removeKey) {
-        BufferOffsetCache bufferOffsetCache = 
bufferOffsetCaches.remove(cacheKey);
-        if (bufferOffsetCache == null) {
-            Optional<ProducerMergedPartitionFileIndex.FixedSizeRegion> 
regionOpt =
-                    dataIndex.getRegion(cacheKey.f0, cacheKey.f1);
-            return regionOpt.map(region -> new BufferOffsetCache(cacheKey.f1, 
region));
-        } else {
-            if (removeKey) {
-                numCaches--;
-            } else {
-                bufferOffsetCaches.put(cacheKey, bufferOffsetCache);
+    private Tuple2<PartialBuffer, Integer> sliceBuffer(
+            ByteBuffer byteBuffer,
+            MemorySegment memorySegment,
+            @Nullable PartialBuffer partialBuffer,
+            BufferRecycler bufferRecycler,
+            List<Buffer> readBuffers) {
+        checkState(reusedHeaderBuffer.position() == 0);
+        checkState(partialBuffer == null || partialBuffer.missingLength() > 0);
+
+        NetworkBuffer buffer = new NetworkBuffer(memorySegment, 
bufferRecycler);
+        buffer.setSize(byteBuffer.remaining());
+
+        try {
+            int numSlicedBytes = 0;
+            if (partialBuffer != null) {
+                // If there is a previous small partial buffer, the current 
read operation should
+                // read additional data and combine it with the existing 
partial to construct a new
+                // complete buffer
+                buffer.retainBuffer();
+                int position = byteBuffer.position() + 
partialBuffer.missingLength();
+                int numPartialBytes = partialBuffer.missingLength();
+                partialBuffer.addPartialBuffer(
+                        buffer.readOnlySlice(byteBuffer.position(), 
numPartialBytes));
+                numSlicedBytes += numPartialBytes;
+                byteBuffer.position(position);
+                readBuffers.add(partialBuffer);
             }
-            return Optional.of(bufferOffsetCache);
+
+            partialBuffer = null;
+            while (byteBuffer.hasRemaining()) {
+                // Parse the small buffer's header
+                BufferHeader header = parseBufferHeader(byteBuffer);
+                if (header == null) {
+                    // If the remaining data length in the buffer is not 
enough to construct a new
+                    // complete buffer header, drop it directly.
+                    break;
+                } else {
+                    numSlicedBytes += HEADER_LENGTH;
+                }
+
+                if (header.getLength() <= byteBuffer.remaining()) {
+                    // The remaining data length in the buffer is enough to 
generate a new small
+                    // sliced network buffer. The small sliced buffer is not a 
partial buffer, we
+                    // should read the slice of the buffer directly
+                    buffer.retainBuffer();
+                    CompositeBuffer slicedBuffer = new CompositeBuffer(header);

Review Comment:
   Why use `CompositeBuffer` here?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java:
##########
@@ -78,4 +87,79 @@ long getPriority(
 
     /** Release the {@link PartitionFileReader}. */
     void release();
+
+    /** A {@link PartialBuffer} is a part slice of a larger buffer. */
+    class PartialBuffer extends CompositeBuffer {
+
+        public PartialBuffer(BufferHeader bufferHeader) {
+            super(bufferHeader);
+        }
+    }
+
+    /**
+     * A wrapper class of the reading buffer result, including the read 
buffers, the hint of
+     * continue reading, and the read progress, etc.
+     */
+    class ReadBufferResult {
+
+        /** The read buffers. */
+        private final List<Buffer> readBuffers;
+
+        /**
+         * A hint to determine whether the caller should continue reading the 
following buffers.
+         * Note that this hint is merely a recommendation and not obligatory. 
Following the hint
+         * while reading buffers may improve performance.
+         */
+        private final boolean shouldContinueReadHint;
+
+        /** The read progress state. */
+        private final ReadProgress readProgress;
+
+        public ReadBufferResult(
+                List<Buffer> readBuffers,
+                boolean shouldContinueReadHint,
+                ReadProgress readProgress) {
+            this.readBuffers = readBuffers;
+            this.shouldContinueReadHint = shouldContinueReadHint;
+            this.readProgress = readProgress;
+        }
+
+        public List<Buffer> getReadBuffers() {
+            return readBuffers;
+        }
+
+        public boolean shouldContinueReadHint() {
+            return shouldContinueReadHint;
+        }
+
+        public ReadProgress getReadProgress() {
+            return readProgress;
+        }
+    }
+
+    /** The {@link ReadProgress} mainly includes current reading offset, end 
of read offset, etc. */
+    class ReadProgress {

Review Comment:
   The only case that we need this outside partition file reader is to 
calculate the priority of subpartition readers. For that, we can make 
`ReadProgress` an argument of `PartitionFileReader#getPriority`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java:
##########
@@ -176,91 +162,179 @@ private void lazyInitializeFileChannel() {
     }
 
     /**
-     * Try to get the cache according to the key.
+     * Slice the read memory segment to multiple small network buffers.
      *
-     * <p>If the relevant buffer offset cache exists, it will be returned and 
subsequently removed.
-     * However, if the buffer offset cache does not exist, a new cache will be 
created using the
-     * data index and returned.
+     * <p>Note that although the method appears to be split into multiple 
buffers, the sliced
+     * buffers still share the same one actual underlying memory segment.
      *
-     * @param cacheKey the key of cache.
-     * @param removeKey boolean decides whether to remove key.
-     * @return returns the relevant buffer offset cache if it exists, 
otherwise return {@link
-     *     Optional#empty()}.
+     * @param byteBuffer the byte buffer to be sliced, it points to the 
underlying memorySegment
+     * @param memorySegment the underlying memory segment to be sliced
+     * @param partialBuffer the partial buffer, if the partial buffer is not 
null, it contains the
+     *     partial data buffer from the previous read
+     * @param readBuffers the read buffers list is to accept the sliced buffers
+     * @return the first field is the partial data buffer, the second field is 
the number of sliced
+     *     bytes.
      */
-    private Optional<BufferOffsetCache> tryGetCache(
-            Tuple2<TieredStorageSubpartitionId, Integer> cacheKey, boolean 
removeKey) {
-        BufferOffsetCache bufferOffsetCache = 
bufferOffsetCaches.remove(cacheKey);
-        if (bufferOffsetCache == null) {
-            Optional<ProducerMergedPartitionFileIndex.FixedSizeRegion> 
regionOpt =
-                    dataIndex.getRegion(cacheKey.f0, cacheKey.f1);
-            return regionOpt.map(region -> new BufferOffsetCache(cacheKey.f1, 
region));
-        } else {
-            if (removeKey) {
-                numCaches--;
-            } else {
-                bufferOffsetCaches.put(cacheKey, bufferOffsetCache);
+    private Tuple2<PartialBuffer, Integer> sliceBuffer(
+            ByteBuffer byteBuffer,
+            MemorySegment memorySegment,
+            @Nullable PartialBuffer partialBuffer,
+            BufferRecycler bufferRecycler,
+            List<Buffer> readBuffers) {
+        checkState(reusedHeaderBuffer.position() == 0);
+        checkState(partialBuffer == null || partialBuffer.missingLength() > 0);
+
+        NetworkBuffer buffer = new NetworkBuffer(memorySegment, 
bufferRecycler);
+        buffer.setSize(byteBuffer.remaining());
+
+        try {
+            int numSlicedBytes = 0;
+            if (partialBuffer != null) {
+                // If there is a previous small partial buffer, the current 
read operation should
+                // read additional data and combine it with the existing 
partial to construct a new
+                // complete buffer
+                buffer.retainBuffer();
+                int position = byteBuffer.position() + 
partialBuffer.missingLength();
+                int numPartialBytes = partialBuffer.missingLength();
+                partialBuffer.addPartialBuffer(
+                        buffer.readOnlySlice(byteBuffer.position(), 
numPartialBytes));
+                numSlicedBytes += numPartialBytes;
+                byteBuffer.position(position);
+                readBuffers.add(partialBuffer);
             }
-            return Optional.of(bufferOffsetCache);
+
+            partialBuffer = null;
+            while (byteBuffer.hasRemaining()) {
+                // Parse the small buffer's header
+                BufferHeader header = parseBufferHeader(byteBuffer);
+                if (header == null) {
+                    // If the remaining data length in the buffer is not 
enough to construct a new
+                    // complete buffer header, drop it directly.
+                    break;
+                } else {
+                    numSlicedBytes += HEADER_LENGTH;
+                }
+
+                if (header.getLength() <= byteBuffer.remaining()) {
+                    // The remaining data length in the buffer is enough to 
generate a new small
+                    // sliced network buffer. The small sliced buffer is not a 
partial buffer, we
+                    // should read the slice of the buffer directly
+                    buffer.retainBuffer();
+                    CompositeBuffer slicedBuffer = new CompositeBuffer(header);
+                    slicedBuffer.addPartialBuffer(
+                            buffer.readOnlySlice(byteBuffer.position(), 
header.getLength()));
+                    byteBuffer.position(byteBuffer.position() + 
header.getLength());
+                    numSlicedBytes += header.getLength();
+                    readBuffers.add(slicedBuffer);
+                } else {
+                    // The remaining data length in the buffer is smaller than 
the actual length of
+                    // the buffer, so we should generate a new partial buffer, 
allowing for
+                    // generating a new complete buffer during the next read 
operation
+                    buffer.retainBuffer();
+                    int numPartialBytes = byteBuffer.remaining();
+                    numSlicedBytes += numPartialBytes;
+                    partialBuffer = new PartialBuffer(header);
+                    partialBuffer.addPartialBuffer(
+                            buffer.readOnlySlice(byteBuffer.position(), 
numPartialBytes));
+                    readBuffers.add(partialBuffer);
+                    break;
+                }
+            }
+            return Tuple2.of(partialBuffer, numSlicedBytes);
+        } catch (Throwable throwable) {
+            LOG.error("Failed to slice the read buffer {}.", byteBuffer, 
throwable);
+            throw throwable;
+        } finally {
+            buffer.recycleBuffer();
         }
     }
 
     /**
-     * The {@link BufferOffsetCache} represents the file offset cache for a 
buffer index. Each cache
-     * includes file offset of the buffer index, the region containing the 
buffer index and next
-     * buffer index to consume.
+     * Return a tuple of the start and end file offset, or return null if the 
buffer is not found in
+     * the data index.
      */
-    private class BufferOffsetCache {
+    @Nullable
+    private Tuple2<Long, Long> getReadStartAndEndOffset(
+            TieredStorageSubpartitionId subpartitionId,
+            int bufferIndex,
+            @Nullable ReadProgress readProgress,
+            @Nullable PartialBuffer partialBuffer) {
+        long readStartOffset;
+        long readEndOffset;
+        if (readProgress == null) {
+            Optional<ProducerMergedPartitionFileIndex.FixedSizeRegion> 
regionOpt =
+                    dataIndex.getRegion(subpartitionId, bufferIndex);
+            if (!regionOpt.isPresent()) {
+                return null;
+            }
+            readStartOffset = regionOpt.get().getRegionFileOffset();
+            readEndOffset = regionOpt.get().getRegionFileEndOffset();
+        } else {
+            readStartOffset =
+                    readProgress.getCurrentReadOffset() + 
partialBufferReadBytes(partialBuffer);
+            readEndOffset = readProgress.getEndOfReadOffset();
+        }
 
-        private final ProducerMergedPartitionFileIndex.FixedSizeRegion region;
+        checkState(readStartOffset <= readEndOffset);
+        return Tuple2.of(readStartOffset, readEndOffset);
+    }
 
-        private long fileOffset;
+    private static ReadBufferResult getReadBufferResult(
+            List<Buffer> readBuffers,
+            long readStartOffset,
+            long readEndOffset,
+            int numBytesToRead,
+            Tuple2<PartialBuffer, Integer> partialAndReadBytes) {

Review Comment:
   Shouldn't use the tuple here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to