xintongsong commented on code in PR #23255:
URL: https://github.com/apache/flink/pull/23255#discussion_r1320960943


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java:
##########
@@ -40,16 +42,22 @@ public interface PartitionFileReader {
      * @param bufferIndex the index of buffer
      * @param memorySegment the empty buffer to store the read buffer
      * @param recycler the buffer recycler
-     * @return null if there is no data otherwise a buffer.
+     * @param readProgress the current read process

Review Comment:
   ```suggestion
        * @param readProgress the current read progress
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java:
##########
@@ -74,8 +82,58 @@ long getPriority(
             TieredStoragePartitionId partitionId,
             TieredStorageSubpartitionId subpartitionId,
             int segmentId,
-            int bufferIndex);
+            int bufferIndex,
+            ReadProgress readProgress);
 
     /** Release the {@link PartitionFileReader}. */
     void release();
+
+    /**
+     * This {@link ReadProgress} defines the read progress of the {@link 
PartitionFileReader}.
+     *
+     * <p>Note that the implementation of the interface should strongly bind 
with the implementation
+     * of {@link PartitionFileReader}.
+     */
+    interface ReadProgress {}
+
+    /**
+     * A wrapper class of the reading buffer result, including the read 
buffers, the hint of
+     * continue reading, and the read progress, etc.
+     */
+    class ReadBufferResult {
+
+        /** The read buffers. */
+        private final List<Buffer> readBuffers;
+
+        /**
+         * A hint to determine whether the caller may continue reading the 
following buffers. Note
+         * that this hint is merely a recommendation and not obligatory. 
Following the hint while
+         * reading buffers may improve performance.
+         */
+        private final boolean continuousReadSuggested;
+
+        /** The read progress state. */
+        private final ReadProgress readProgress;

Review Comment:
   This should be nullable.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java:
##########
@@ -40,16 +42,22 @@ public interface PartitionFileReader {
      * @param bufferIndex the index of buffer
      * @param memorySegment the empty buffer to store the read buffer
      * @param recycler the buffer recycler
-     * @return null if there is no data otherwise a buffer.
+     * @param readProgress the current read process

Review Comment:
   We should explain that this comes from the previous `ReadBufferResult`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java:
##########
@@ -51,101 +60,91 @@
  */
 public class ProducerMergedPartitionFileReader implements PartitionFileReader {
 
-    /**
-     * Max number of caches.
-     *
-     * <p>The constant defines the maximum number of caches that can be 
created. Its value is set to
-     * 10000, which is considered sufficient for most parallel jobs. Each 
cache only contains
-     * references and numerical variables and occupies a minimal amount of 
memory so the value is
-     * not excessively large.
-     */
-    private static final int DEFAULT_MAX_CACHE_NUM = 10000;
-
-    /**
-     * Buffer offset caches stored in map.
-     *
-     * <p>The key is the combination of {@link TieredStorageSubpartitionId} 
and buffer index. The
-     * value is the buffer offset cache, which includes file offset of the 
buffer index, the region
-     * containing the buffer index and next buffer index to consume.
-     */
-    private final Map<Tuple2<TieredStorageSubpartitionId, Integer>, 
BufferOffsetCache>
-            bufferOffsetCaches;
+    private static final Logger LOG =
+            LoggerFactory.getLogger(ProducerMergedPartitionFileReader.class);
 
     private final ByteBuffer reusedHeaderBuffer = 
BufferReaderWriterUtil.allocatedHeaderBuffer();
 
     private final Path dataFilePath;
 
     private final ProducerMergedPartitionFileIndex dataIndex;
 
-    private final int maxCacheNumber;
-
     private volatile FileChannel fileChannel;
 
-    /** The current number of caches. */
-    private int numCaches;
-
-    ProducerMergedPartitionFileReader(
-            Path dataFilePath, ProducerMergedPartitionFileIndex dataIndex) {
-        this(dataFilePath, dataIndex, DEFAULT_MAX_CACHE_NUM);
-    }
-
     @VisibleForTesting
     ProducerMergedPartitionFileReader(
-            Path dataFilePath, ProducerMergedPartitionFileIndex dataIndex, int 
maxCacheNumber) {
+            Path dataFilePath, ProducerMergedPartitionFileIndex dataIndex) {
         this.dataFilePath = dataFilePath;
         this.dataIndex = dataIndex;
-        this.bufferOffsetCaches = new HashMap<>();
-        this.maxCacheNumber = maxCacheNumber;
     }
 
     @Override
-    public Buffer readBuffer(
+    public ReadBufferResult readBuffer(
             TieredStoragePartitionId partitionId,
             TieredStorageSubpartitionId subpartitionId,
             int segmentId,
             int bufferIndex,
             MemorySegment memorySegment,
-            BufferRecycler recycler)
+            BufferRecycler recycler,
+            @Nullable ReadProgress readProgress,
+            @Nullable CompositeBuffer partialBuffer)
             throws IOException {
 
         lazyInitializeFileChannel();
-        Tuple2<TieredStorageSubpartitionId, Integer> cacheKey =
-                Tuple2.of(subpartitionId, bufferIndex);
-        Optional<BufferOffsetCache> cache = tryGetCache(cacheKey, true);
-        if (!cache.isPresent()) {
+
+        // Get the read offset, including the start offset, the end offset
+        Tuple2<Long, Long> startAndEndOffset =
+                getReadStartAndEndOffset(subpartitionId, bufferIndex, 
readProgress, partialBuffer);
+        if (startAndEndOffset == null) {
             return null;
         }
-        fileChannel.position(cache.get().getFileOffset());
-        Buffer buffer =
-                readFromByteChannel(fileChannel, reusedHeaderBuffer, 
memorySegment, recycler);
-        boolean hasNextBuffer =
-                cache.get()
-                        .advance(
-                                checkNotNull(buffer).readableBytes()
-                                        + 
BufferReaderWriterUtil.HEADER_LENGTH);
-        if (hasNextBuffer) {
-            int nextBufferIndex = bufferIndex + 1;
-            // TODO: introduce the LRU cache strategy in the future to 
restrict the total
-            // cache number. Testing to prevent cache leaks has been 
implemented.
-            if (numCaches < maxCacheNumber) {
-                bufferOffsetCaches.put(Tuple2.of(subpartitionId, 
nextBufferIndex), cache.get());
-                numCaches++;
-            }
+        long readStartOffset = startAndEndOffset.f0;
+        long readEndOffset = startAndEndOffset.f1;
+
+        int numBytesToRead =
+                Math.min(memorySegment.size(), (int) (readEndOffset - 
readStartOffset));
+
+        if (numBytesToRead == 0) {
+            return null;
         }
-        return buffer;
+
+        List<Buffer> readBuffers = new LinkedList<>();
+        ByteBuffer byteBuffer = memorySegment.wrap(0, numBytesToRead);
+        fileChannel.position(readStartOffset);
+        // Read data to the memory segment, note the read size is 
numBytesToRead
+        readFileDataToBuffer(memorySegment, recycler, byteBuffer);
+
+        // Slice the read memory segment to multiple small network buffers and 
add them to
+        // readBuffers
+        Tuple2<Integer, Integer> partial =
+                sliceBuffer(byteBuffer, memorySegment, partialBuffer, 
recycler, readBuffers);
+
+        return getReadBufferResult(
+                readBuffers,
+                readStartOffset,
+                readEndOffset,
+                numBytesToRead,
+                partial.f0,
+                partial.f1);
     }
 
     @Override
     public long getPriority(
             TieredStoragePartitionId partitionId,
             TieredStorageSubpartitionId subpartitionId,
             int segmentId,
-            int bufferIndex) {
+            int bufferIndex,
+            ReadProgress readProgress) {
         lazyInitializeFileChannel();
-        Tuple2<TieredStorageSubpartitionId, Integer> cacheKey =
-                Tuple2.of(subpartitionId, bufferIndex);
-        return tryGetCache(cacheKey, false)
-                .map(BufferOffsetCache::getFileOffset)
+        if (readProgress != null) {
+            checkState(
+                    readProgress instanceof 
ProducerMergedPartitionFile.ProducerMergedReadProgress);
+            return ((ProducerMergedPartitionFile.ProducerMergedReadProgress) 
readProgress)
+                    .getCurrentBufferOffset();

Review Comment:
   Shouldn't we check whether current offset is end of region?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java:
##########
@@ -74,8 +82,58 @@ long getPriority(
             TieredStoragePartitionId partitionId,
             TieredStorageSubpartitionId subpartitionId,
             int segmentId,
-            int bufferIndex);
+            int bufferIndex,
+            ReadProgress readProgress);

Review Comment:
   JavaDoc not updated.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFile.java:
##########
@@ -39,4 +39,32 @@ public static ProducerMergedPartitionFileReader 
createPartitionFileReader(
             Path dataFilePath, ProducerMergedPartitionFileIndex 
partitionFileIndex) {
         return new ProducerMergedPartitionFileReader(dataFilePath, 
partitionFileIndex);
     }
+
+    /**
+     * The implementation of {@link PartitionFileReader.ReadProgress} mainly 
includes current
+     * reading offset, end of read offset, etc.
+     */
+    public static class ProducerMergedReadProgress implements 
PartitionFileReader.ReadProgress {

Review Comment:
   This probably belongs to `ProducerMergedPartitionFileReader`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java:
##########
@@ -176,91 +175,189 @@ private void lazyInitializeFileChannel() {
     }
 
     /**
-     * Try to get the cache according to the key.
+     * Slice the read memory segment to multiple small network buffers.
      *
-     * <p>If the relevant buffer offset cache exists, it will be returned and 
subsequently removed.
-     * However, if the buffer offset cache does not exist, a new cache will be 
created using the
-     * data index and returned.
+     * <p>Note that although the method appears to be split into multiple 
buffers, the sliced
+     * buffers still share the same one actual underlying memory segment.
      *
-     * @param cacheKey the key of cache.
-     * @param removeKey boolean decides whether to remove key.
-     * @return returns the relevant buffer offset cache if it exists, 
otherwise return {@link
-     *     Optional#empty()}.
+     * @param byteBuffer the byte buffer to be sliced, it points to the 
underlying memorySegment
+     * @param memorySegment the underlying memory segment to be sliced
+     * @param partialBuffer the partial buffer, if the partial buffer is not 
null, it contains the
+     *     partial data buffer from the previous read
+     * @param readBuffers the read buffers list is to accept the sliced buffers
+     * @return the first field is the number of total sliced bytes, the second 
field is the bytes of
+     *     the partial buffer
      */
-    private Optional<BufferOffsetCache> tryGetCache(
-            Tuple2<TieredStorageSubpartitionId, Integer> cacheKey, boolean 
removeKey) {
-        BufferOffsetCache bufferOffsetCache = 
bufferOffsetCaches.remove(cacheKey);
-        if (bufferOffsetCache == null) {
-            Optional<ProducerMergedPartitionFileIndex.FixedSizeRegion> 
regionOpt =
-                    dataIndex.getRegion(cacheKey.f0, cacheKey.f1);
-            return regionOpt.map(region -> new BufferOffsetCache(cacheKey.f1, 
region));
-        } else {
-            if (removeKey) {
-                numCaches--;
-            } else {
-                bufferOffsetCaches.put(cacheKey, bufferOffsetCache);
+    private Tuple2<Integer, Integer> sliceBuffer(
+            ByteBuffer byteBuffer,
+            MemorySegment memorySegment,
+            @Nullable CompositeBuffer partialBuffer,
+            BufferRecycler bufferRecycler,
+            List<Buffer> readBuffers) {
+        checkState(reusedHeaderBuffer.position() == 0);
+        checkState(partialBuffer == null || partialBuffer.missingLength() > 0);
+
+        NetworkBuffer buffer = new NetworkBuffer(memorySegment, 
bufferRecycler);
+        buffer.setSize(byteBuffer.remaining());
+
+        try {
+            int numSlicedBytes = 0;
+            if (partialBuffer != null) {
+                // If there is a previous small partial buffer, the current 
read operation should
+                // read additional data and combine it with the existing 
partial to construct a new
+                // complete buffer
+                buffer.retainBuffer();
+                int position = byteBuffer.position() + 
partialBuffer.missingLength();
+                int numPartialBytes = partialBuffer.missingLength();
+                partialBuffer.addPartialBuffer(
+                        buffer.readOnlySlice(byteBuffer.position(), 
numPartialBytes));
+                numSlicedBytes += numPartialBytes;
+                byteBuffer.position(position);
+                readBuffers.add(partialBuffer);
+            }
+
+            partialBuffer = null;
+            while (byteBuffer.hasRemaining()) {
+                // Parse the small buffer's header
+                BufferHeader header = parseBufferHeader(byteBuffer);
+                if (header == null) {
+                    // If the remaining data length in the buffer is not 
enough to construct a new
+                    // complete buffer header, drop it directly.
+                    break;
+                } else {
+                    numSlicedBytes += HEADER_LENGTH;
+                }
+
+                if (header.getLength() <= byteBuffer.remaining()) {
+                    // The remaining data length in the buffer is enough to 
generate a new small
+                    // sliced network buffer. The small sliced buffer is not a 
partial buffer, we
+                    // should read the slice of the buffer directly
+                    buffer.retainBuffer();
+                    ReadOnlySlicedNetworkBuffer slicedBuffer =
+                            buffer.readOnlySlice(byteBuffer.position(), 
header.getLength());
+                    slicedBuffer.setDataType(header.getDataType());
+                    slicedBuffer.setCompressed(header.isCompressed());
+                    byteBuffer.position(byteBuffer.position() + 
header.getLength());
+                    numSlicedBytes += header.getLength();
+                    readBuffers.add(slicedBuffer);
+                } else {
+                    // The remaining data length in the buffer is smaller than 
the actual length of
+                    // the buffer, so we should generate a new partial buffer, 
allowing for
+                    // generating a new complete buffer during the next read 
operation
+                    buffer.retainBuffer();
+                    int numPartialBytes = byteBuffer.remaining();
+                    numSlicedBytes += numPartialBytes;
+                    partialBuffer = new CompositeBuffer(header);
+                    partialBuffer.addPartialBuffer(
+                            buffer.readOnlySlice(byteBuffer.position(), 
numPartialBytes));
+                    readBuffers.add(partialBuffer);
+                    break;
+                }
             }
-            return Optional.of(bufferOffsetCache);
+            return Tuple2.of(numSlicedBytes, 
partialBufferReadBytes(partialBuffer));
+        } catch (Throwable throwable) {
+            LOG.error("Failed to slice the read buffer {}.", byteBuffer, 
throwable);
+            throw throwable;
+        } finally {
+            buffer.recycleBuffer();
         }
     }
 
     /**
-     * The {@link BufferOffsetCache} represents the file offset cache for a 
buffer index. Each cache
-     * includes file offset of the buffer index, the region containing the 
buffer index and next
-     * buffer index to consume.
+     * Return a tuple of the start and end file offset, or return null if the 
buffer is not found in
+     * the data index.
      */
-    private class BufferOffsetCache {
+    @Nullable
+    private Tuple2<Long, Long> getReadStartAndEndOffset(
+            TieredStorageSubpartitionId subpartitionId,
+            int bufferIndex,
+            @Nullable ReadProgress currentReadProgress,
+            @Nullable CompositeBuffer partialBuffer) {
+        ProducerMergedPartitionFile.ProducerMergedReadProgress readProgress =
+                convertToCurrentReadProgress(currentReadProgress);
+        long readStartOffset;
+        long readEndOffset;
+        if (readProgress == null
+                || readProgress.getCurrentBufferOffset() == 
readProgress.getEndOfRegionOffset()) {
+            Optional<ProducerMergedPartitionFileIndex.FixedSizeRegion> 
regionOpt =
+                    dataIndex.getRegion(subpartitionId, bufferIndex);
+            if (!regionOpt.isPresent()) {
+                return null;
+            }
+            readStartOffset = regionOpt.get().getRegionStartOffset();
+            readEndOffset = regionOpt.get().getRegionEndOffset();
+        } else {
+            readStartOffset =
+                    readProgress.getCurrentBufferOffset() + 
partialBufferReadBytes(partialBuffer);
+            readEndOffset = readProgress.getEndOfRegionOffset();
+        }
 
-        private final ProducerMergedPartitionFileIndex.FixedSizeRegion region;
+        checkState(readStartOffset <= readEndOffset);
+        return Tuple2.of(readStartOffset, readEndOffset);
+    }
 
-        private long fileOffset;
+    private static ReadBufferResult getReadBufferResult(
+            List<Buffer> readBuffers,
+            long readStartOffset,
+            long readEndOffset,
+            int numBytesToRead,
+            int numBytesRealRead,
+            int numBytesReadPartialBuffer) {
+        boolean shouldContinueRead = readStartOffset + numBytesRealRead < 
readEndOffset;
+        ProducerMergedPartitionFile.ProducerMergedReadProgress readProgress =
+                new ProducerMergedPartitionFile.ProducerMergedReadProgress(
+                        readStartOffset + numBytesRealRead - 
numBytesReadPartialBuffer,
+                        readEndOffset);
+        checkState(
+                numBytesRealRead <= numBytesToRead
+                        && numBytesToRead - numBytesRealRead < HEADER_LENGTH);
 
-        private int nextBufferIndex;
+        return new ReadBufferResult(readBuffers, shouldContinueRead, 
readProgress);
+    }
 
-        private BufferOffsetCache(
-                int bufferIndex, 
ProducerMergedPartitionFileIndex.FixedSizeRegion region) {
-            this.nextBufferIndex = bufferIndex;
-            this.region = region;
-            moveFileOffsetToBuffer(bufferIndex);
+    private void readFileDataToBuffer(
+            MemorySegment memorySegment, BufferRecycler recycler, ByteBuffer 
byteBuffer)
+            throws IOException {
+        try {
+            BufferReaderWriterUtil.readByteBufferFully(fileChannel, 
byteBuffer);
+            byteBuffer.flip();
+        } catch (Throwable throwable) {
+            recycler.recycle(memorySegment);
+            throw throwable;
         }
+    }
 
-        /**
-         * Get the file offset.
-         *
-         * @return the file offset.
-         */
-        private long getFileOffset() {
-            return fileOffset;
-        }
+    private static int partialBufferReadBytes(@Nullable CompositeBuffer 
partialBuffer) {

Review Comment:
   ```suggestion
       private static int getPartialBufferReadBytes(@Nullable CompositeBuffer 
partialBuffer) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to