TanYuxin-tyx commented on code in PR #23255: URL: https://github.com/apache/flink/pull/23255#discussion_r1321213128
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java: ########## @@ -51,101 +60,91 @@ */ public class ProducerMergedPartitionFileReader implements PartitionFileReader { - /** - * Max number of caches. - * - * <p>The constant defines the maximum number of caches that can be created. Its value is set to - * 10000, which is considered sufficient for most parallel jobs. Each cache only contains - * references and numerical variables and occupies a minimal amount of memory so the value is - * not excessively large. - */ - private static final int DEFAULT_MAX_CACHE_NUM = 10000; - - /** - * Buffer offset caches stored in map. - * - * <p>The key is the combination of {@link TieredStorageSubpartitionId} and buffer index. The - * value is the buffer offset cache, which includes file offset of the buffer index, the region - * containing the buffer index and next buffer index to consume. - */ - private final Map<Tuple2<TieredStorageSubpartitionId, Integer>, BufferOffsetCache> - bufferOffsetCaches; + private static final Logger LOG = + LoggerFactory.getLogger(ProducerMergedPartitionFileReader.class); private final ByteBuffer reusedHeaderBuffer = BufferReaderWriterUtil.allocatedHeaderBuffer(); private final Path dataFilePath; private final ProducerMergedPartitionFileIndex dataIndex; - private final int maxCacheNumber; - private volatile FileChannel fileChannel; - /** The current number of caches. */ - private int numCaches; - - ProducerMergedPartitionFileReader( - Path dataFilePath, ProducerMergedPartitionFileIndex dataIndex) { - this(dataFilePath, dataIndex, DEFAULT_MAX_CACHE_NUM); - } - @VisibleForTesting ProducerMergedPartitionFileReader( - Path dataFilePath, ProducerMergedPartitionFileIndex dataIndex, int maxCacheNumber) { + Path dataFilePath, ProducerMergedPartitionFileIndex dataIndex) { this.dataFilePath = dataFilePath; this.dataIndex = dataIndex; - this.bufferOffsetCaches = new HashMap<>(); - this.maxCacheNumber = maxCacheNumber; } @Override - public Buffer readBuffer( + public ReadBufferResult readBuffer( TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId, int segmentId, int bufferIndex, MemorySegment memorySegment, - BufferRecycler recycler) + BufferRecycler recycler, + @Nullable ReadProgress readProgress, + @Nullable CompositeBuffer partialBuffer) throws IOException { lazyInitializeFileChannel(); - Tuple2<TieredStorageSubpartitionId, Integer> cacheKey = - Tuple2.of(subpartitionId, bufferIndex); - Optional<BufferOffsetCache> cache = tryGetCache(cacheKey, true); - if (!cache.isPresent()) { + + // Get the read offset, including the start offset, the end offset + Tuple2<Long, Long> startAndEndOffset = + getReadStartAndEndOffset(subpartitionId, bufferIndex, readProgress, partialBuffer); + if (startAndEndOffset == null) { return null; } - fileChannel.position(cache.get().getFileOffset()); - Buffer buffer = - readFromByteChannel(fileChannel, reusedHeaderBuffer, memorySegment, recycler); - boolean hasNextBuffer = - cache.get() - .advance( - checkNotNull(buffer).readableBytes() - + BufferReaderWriterUtil.HEADER_LENGTH); - if (hasNextBuffer) { - int nextBufferIndex = bufferIndex + 1; - // TODO: introduce the LRU cache strategy in the future to restrict the total - // cache number. Testing to prevent cache leaks has been implemented. - if (numCaches < maxCacheNumber) { - bufferOffsetCaches.put(Tuple2.of(subpartitionId, nextBufferIndex), cache.get()); - numCaches++; - } + long readStartOffset = startAndEndOffset.f0; + long readEndOffset = startAndEndOffset.f1; + + int numBytesToRead = + Math.min(memorySegment.size(), (int) (readEndOffset - readStartOffset)); + + if (numBytesToRead == 0) { + return null; } - return buffer; + + List<Buffer> readBuffers = new LinkedList<>(); + ByteBuffer byteBuffer = memorySegment.wrap(0, numBytesToRead); + fileChannel.position(readStartOffset); + // Read data to the memory segment, note the read size is numBytesToRead + readFileDataToBuffer(memorySegment, recycler, byteBuffer); + + // Slice the read memory segment to multiple small network buffers and add them to + // readBuffers + Tuple2<Integer, Integer> partial = + sliceBuffer(byteBuffer, memorySegment, partialBuffer, recycler, readBuffers); + + return getReadBufferResult( + readBuffers, + readStartOffset, + readEndOffset, + numBytesToRead, + partial.f0, + partial.f1); } @Override public long getPriority( TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId, int segmentId, - int bufferIndex) { + int bufferIndex, + ReadProgress readProgress) { lazyInitializeFileChannel(); - Tuple2<TieredStorageSubpartitionId, Integer> cacheKey = - Tuple2.of(subpartitionId, bufferIndex); - return tryGetCache(cacheKey, false) - .map(BufferOffsetCache::getFileOffset) + if (readProgress != null) { + checkState( + readProgress instanceof ProducerMergedPartitionFile.ProducerMergedReadProgress); + return ((ProducerMergedPartitionFile.ProducerMergedReadProgress) readProgress) + .getCurrentBufferOffset(); Review Comment: Ok, Fixed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org