TanYuxin-tyx commented on code in PR #23255:
URL: https://github.com/apache/flink/pull/23255#discussion_r1321213128


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java:
##########
@@ -51,101 +60,91 @@
  */
 public class ProducerMergedPartitionFileReader implements PartitionFileReader {
 
-    /**
-     * Max number of caches.
-     *
-     * <p>The constant defines the maximum number of caches that can be 
created. Its value is set to
-     * 10000, which is considered sufficient for most parallel jobs. Each 
cache only contains
-     * references and numerical variables and occupies a minimal amount of 
memory so the value is
-     * not excessively large.
-     */
-    private static final int DEFAULT_MAX_CACHE_NUM = 10000;
-
-    /**
-     * Buffer offset caches stored in map.
-     *
-     * <p>The key is the combination of {@link TieredStorageSubpartitionId} 
and buffer index. The
-     * value is the buffer offset cache, which includes file offset of the 
buffer index, the region
-     * containing the buffer index and next buffer index to consume.
-     */
-    private final Map<Tuple2<TieredStorageSubpartitionId, Integer>, 
BufferOffsetCache>
-            bufferOffsetCaches;
+    private static final Logger LOG =
+            LoggerFactory.getLogger(ProducerMergedPartitionFileReader.class);
 
     private final ByteBuffer reusedHeaderBuffer = 
BufferReaderWriterUtil.allocatedHeaderBuffer();
 
     private final Path dataFilePath;
 
     private final ProducerMergedPartitionFileIndex dataIndex;
 
-    private final int maxCacheNumber;
-
     private volatile FileChannel fileChannel;
 
-    /** The current number of caches. */
-    private int numCaches;
-
-    ProducerMergedPartitionFileReader(
-            Path dataFilePath, ProducerMergedPartitionFileIndex dataIndex) {
-        this(dataFilePath, dataIndex, DEFAULT_MAX_CACHE_NUM);
-    }
-
     @VisibleForTesting
     ProducerMergedPartitionFileReader(
-            Path dataFilePath, ProducerMergedPartitionFileIndex dataIndex, int 
maxCacheNumber) {
+            Path dataFilePath, ProducerMergedPartitionFileIndex dataIndex) {
         this.dataFilePath = dataFilePath;
         this.dataIndex = dataIndex;
-        this.bufferOffsetCaches = new HashMap<>();
-        this.maxCacheNumber = maxCacheNumber;
     }
 
     @Override
-    public Buffer readBuffer(
+    public ReadBufferResult readBuffer(
             TieredStoragePartitionId partitionId,
             TieredStorageSubpartitionId subpartitionId,
             int segmentId,
             int bufferIndex,
             MemorySegment memorySegment,
-            BufferRecycler recycler)
+            BufferRecycler recycler,
+            @Nullable ReadProgress readProgress,
+            @Nullable CompositeBuffer partialBuffer)
             throws IOException {
 
         lazyInitializeFileChannel();
-        Tuple2<TieredStorageSubpartitionId, Integer> cacheKey =
-                Tuple2.of(subpartitionId, bufferIndex);
-        Optional<BufferOffsetCache> cache = tryGetCache(cacheKey, true);
-        if (!cache.isPresent()) {
+
+        // Get the read offset, including the start offset, the end offset
+        Tuple2<Long, Long> startAndEndOffset =
+                getReadStartAndEndOffset(subpartitionId, bufferIndex, 
readProgress, partialBuffer);
+        if (startAndEndOffset == null) {
             return null;
         }
-        fileChannel.position(cache.get().getFileOffset());
-        Buffer buffer =
-                readFromByteChannel(fileChannel, reusedHeaderBuffer, 
memorySegment, recycler);
-        boolean hasNextBuffer =
-                cache.get()
-                        .advance(
-                                checkNotNull(buffer).readableBytes()
-                                        + 
BufferReaderWriterUtil.HEADER_LENGTH);
-        if (hasNextBuffer) {
-            int nextBufferIndex = bufferIndex + 1;
-            // TODO: introduce the LRU cache strategy in the future to 
restrict the total
-            // cache number. Testing to prevent cache leaks has been 
implemented.
-            if (numCaches < maxCacheNumber) {
-                bufferOffsetCaches.put(Tuple2.of(subpartitionId, 
nextBufferIndex), cache.get());
-                numCaches++;
-            }
+        long readStartOffset = startAndEndOffset.f0;
+        long readEndOffset = startAndEndOffset.f1;
+
+        int numBytesToRead =
+                Math.min(memorySegment.size(), (int) (readEndOffset - 
readStartOffset));
+
+        if (numBytesToRead == 0) {
+            return null;
         }
-        return buffer;
+
+        List<Buffer> readBuffers = new LinkedList<>();
+        ByteBuffer byteBuffer = memorySegment.wrap(0, numBytesToRead);
+        fileChannel.position(readStartOffset);
+        // Read data to the memory segment, note the read size is 
numBytesToRead
+        readFileDataToBuffer(memorySegment, recycler, byteBuffer);
+
+        // Slice the read memory segment to multiple small network buffers and 
add them to
+        // readBuffers
+        Tuple2<Integer, Integer> partial =
+                sliceBuffer(byteBuffer, memorySegment, partialBuffer, 
recycler, readBuffers);
+
+        return getReadBufferResult(
+                readBuffers,
+                readStartOffset,
+                readEndOffset,
+                numBytesToRead,
+                partial.f0,
+                partial.f1);
     }
 
     @Override
     public long getPriority(
             TieredStoragePartitionId partitionId,
             TieredStorageSubpartitionId subpartitionId,
             int segmentId,
-            int bufferIndex) {
+            int bufferIndex,
+            ReadProgress readProgress) {
         lazyInitializeFileChannel();
-        Tuple2<TieredStorageSubpartitionId, Integer> cacheKey =
-                Tuple2.of(subpartitionId, bufferIndex);
-        return tryGetCache(cacheKey, false)
-                .map(BufferOffsetCache::getFileOffset)
+        if (readProgress != null) {
+            checkState(
+                    readProgress instanceof 
ProducerMergedPartitionFile.ProducerMergedReadProgress);
+            return ((ProducerMergedPartitionFile.ProducerMergedReadProgress) 
readProgress)
+                    .getCurrentBufferOffset();

Review Comment:
   Ok, Fixed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to