xintongsong commented on code in PR #23255: URL: https://github.com/apache/flink/pull/23255#discussion_r1320960943
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java: ########## @@ -40,16 +42,22 @@ public interface PartitionFileReader { * @param bufferIndex the index of buffer * @param memorySegment the empty buffer to store the read buffer * @param recycler the buffer recycler - * @return null if there is no data otherwise a buffer. + * @param readProgress the current read process Review Comment: ```suggestion * @param readProgress the current read progress ``` ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java: ########## @@ -74,8 +82,58 @@ long getPriority( TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId, int segmentId, - int bufferIndex); + int bufferIndex, + ReadProgress readProgress); /** Release the {@link PartitionFileReader}. */ void release(); + + /** + * This {@link ReadProgress} defines the read progress of the {@link PartitionFileReader}. + * + * <p>Note that the implementation of the interface should strongly bind with the implementation + * of {@link PartitionFileReader}. + */ + interface ReadProgress {} + + /** + * A wrapper class of the reading buffer result, including the read buffers, the hint of + * continue reading, and the read progress, etc. + */ + class ReadBufferResult { + + /** The read buffers. */ + private final List<Buffer> readBuffers; + + /** + * A hint to determine whether the caller may continue reading the following buffers. Note + * that this hint is merely a recommendation and not obligatory. Following the hint while + * reading buffers may improve performance. + */ + private final boolean continuousReadSuggested; + + /** The read progress state. */ + private final ReadProgress readProgress; Review Comment: This should be nullable. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java: ########## @@ -40,16 +42,22 @@ public interface PartitionFileReader { * @param bufferIndex the index of buffer * @param memorySegment the empty buffer to store the read buffer * @param recycler the buffer recycler - * @return null if there is no data otherwise a buffer. + * @param readProgress the current read process Review Comment: We should explain that this comes from the previous `ReadBufferResult`. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java: ########## @@ -51,101 +60,91 @@ */ public class ProducerMergedPartitionFileReader implements PartitionFileReader { - /** - * Max number of caches. - * - * <p>The constant defines the maximum number of caches that can be created. Its value is set to - * 10000, which is considered sufficient for most parallel jobs. Each cache only contains - * references and numerical variables and occupies a minimal amount of memory so the value is - * not excessively large. - */ - private static final int DEFAULT_MAX_CACHE_NUM = 10000; - - /** - * Buffer offset caches stored in map. - * - * <p>The key is the combination of {@link TieredStorageSubpartitionId} and buffer index. The - * value is the buffer offset cache, which includes file offset of the buffer index, the region - * containing the buffer index and next buffer index to consume. - */ - private final Map<Tuple2<TieredStorageSubpartitionId, Integer>, BufferOffsetCache> - bufferOffsetCaches; + private static final Logger LOG = + LoggerFactory.getLogger(ProducerMergedPartitionFileReader.class); private final ByteBuffer reusedHeaderBuffer = BufferReaderWriterUtil.allocatedHeaderBuffer(); private final Path dataFilePath; private final ProducerMergedPartitionFileIndex dataIndex; - private final int maxCacheNumber; - private volatile FileChannel fileChannel; - /** The current number of caches. */ - private int numCaches; - - ProducerMergedPartitionFileReader( - Path dataFilePath, ProducerMergedPartitionFileIndex dataIndex) { - this(dataFilePath, dataIndex, DEFAULT_MAX_CACHE_NUM); - } - @VisibleForTesting ProducerMergedPartitionFileReader( - Path dataFilePath, ProducerMergedPartitionFileIndex dataIndex, int maxCacheNumber) { + Path dataFilePath, ProducerMergedPartitionFileIndex dataIndex) { this.dataFilePath = dataFilePath; this.dataIndex = dataIndex; - this.bufferOffsetCaches = new HashMap<>(); - this.maxCacheNumber = maxCacheNumber; } @Override - public Buffer readBuffer( + public ReadBufferResult readBuffer( TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId, int segmentId, int bufferIndex, MemorySegment memorySegment, - BufferRecycler recycler) + BufferRecycler recycler, + @Nullable ReadProgress readProgress, + @Nullable CompositeBuffer partialBuffer) throws IOException { lazyInitializeFileChannel(); - Tuple2<TieredStorageSubpartitionId, Integer> cacheKey = - Tuple2.of(subpartitionId, bufferIndex); - Optional<BufferOffsetCache> cache = tryGetCache(cacheKey, true); - if (!cache.isPresent()) { + + // Get the read offset, including the start offset, the end offset + Tuple2<Long, Long> startAndEndOffset = + getReadStartAndEndOffset(subpartitionId, bufferIndex, readProgress, partialBuffer); + if (startAndEndOffset == null) { return null; } - fileChannel.position(cache.get().getFileOffset()); - Buffer buffer = - readFromByteChannel(fileChannel, reusedHeaderBuffer, memorySegment, recycler); - boolean hasNextBuffer = - cache.get() - .advance( - checkNotNull(buffer).readableBytes() - + BufferReaderWriterUtil.HEADER_LENGTH); - if (hasNextBuffer) { - int nextBufferIndex = bufferIndex + 1; - // TODO: introduce the LRU cache strategy in the future to restrict the total - // cache number. Testing to prevent cache leaks has been implemented. - if (numCaches < maxCacheNumber) { - bufferOffsetCaches.put(Tuple2.of(subpartitionId, nextBufferIndex), cache.get()); - numCaches++; - } + long readStartOffset = startAndEndOffset.f0; + long readEndOffset = startAndEndOffset.f1; + + int numBytesToRead = + Math.min(memorySegment.size(), (int) (readEndOffset - readStartOffset)); + + if (numBytesToRead == 0) { + return null; } - return buffer; + + List<Buffer> readBuffers = new LinkedList<>(); + ByteBuffer byteBuffer = memorySegment.wrap(0, numBytesToRead); + fileChannel.position(readStartOffset); + // Read data to the memory segment, note the read size is numBytesToRead + readFileDataToBuffer(memorySegment, recycler, byteBuffer); + + // Slice the read memory segment to multiple small network buffers and add them to + // readBuffers + Tuple2<Integer, Integer> partial = + sliceBuffer(byteBuffer, memorySegment, partialBuffer, recycler, readBuffers); + + return getReadBufferResult( + readBuffers, + readStartOffset, + readEndOffset, + numBytesToRead, + partial.f0, + partial.f1); } @Override public long getPriority( TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId, int segmentId, - int bufferIndex) { + int bufferIndex, + ReadProgress readProgress) { lazyInitializeFileChannel(); - Tuple2<TieredStorageSubpartitionId, Integer> cacheKey = - Tuple2.of(subpartitionId, bufferIndex); - return tryGetCache(cacheKey, false) - .map(BufferOffsetCache::getFileOffset) + if (readProgress != null) { + checkState( + readProgress instanceof ProducerMergedPartitionFile.ProducerMergedReadProgress); + return ((ProducerMergedPartitionFile.ProducerMergedReadProgress) readProgress) + .getCurrentBufferOffset(); Review Comment: Shouldn't we check whether current offset is end of region? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/PartitionFileReader.java: ########## @@ -74,8 +82,58 @@ long getPriority( TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId, int segmentId, - int bufferIndex); + int bufferIndex, + ReadProgress readProgress); Review Comment: JavaDoc not updated. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFile.java: ########## @@ -39,4 +39,32 @@ public static ProducerMergedPartitionFileReader createPartitionFileReader( Path dataFilePath, ProducerMergedPartitionFileIndex partitionFileIndex) { return new ProducerMergedPartitionFileReader(dataFilePath, partitionFileIndex); } + + /** + * The implementation of {@link PartitionFileReader.ReadProgress} mainly includes current + * reading offset, end of read offset, etc. + */ + public static class ProducerMergedReadProgress implements PartitionFileReader.ReadProgress { Review Comment: This probably belongs to `ProducerMergedPartitionFileReader` ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileReader.java: ########## @@ -176,91 +175,189 @@ private void lazyInitializeFileChannel() { } /** - * Try to get the cache according to the key. + * Slice the read memory segment to multiple small network buffers. * - * <p>If the relevant buffer offset cache exists, it will be returned and subsequently removed. - * However, if the buffer offset cache does not exist, a new cache will be created using the - * data index and returned. + * <p>Note that although the method appears to be split into multiple buffers, the sliced + * buffers still share the same one actual underlying memory segment. * - * @param cacheKey the key of cache. - * @param removeKey boolean decides whether to remove key. - * @return returns the relevant buffer offset cache if it exists, otherwise return {@link - * Optional#empty()}. + * @param byteBuffer the byte buffer to be sliced, it points to the underlying memorySegment + * @param memorySegment the underlying memory segment to be sliced + * @param partialBuffer the partial buffer, if the partial buffer is not null, it contains the + * partial data buffer from the previous read + * @param readBuffers the read buffers list is to accept the sliced buffers + * @return the first field is the number of total sliced bytes, the second field is the bytes of + * the partial buffer */ - private Optional<BufferOffsetCache> tryGetCache( - Tuple2<TieredStorageSubpartitionId, Integer> cacheKey, boolean removeKey) { - BufferOffsetCache bufferOffsetCache = bufferOffsetCaches.remove(cacheKey); - if (bufferOffsetCache == null) { - Optional<ProducerMergedPartitionFileIndex.FixedSizeRegion> regionOpt = - dataIndex.getRegion(cacheKey.f0, cacheKey.f1); - return regionOpt.map(region -> new BufferOffsetCache(cacheKey.f1, region)); - } else { - if (removeKey) { - numCaches--; - } else { - bufferOffsetCaches.put(cacheKey, bufferOffsetCache); + private Tuple2<Integer, Integer> sliceBuffer( + ByteBuffer byteBuffer, + MemorySegment memorySegment, + @Nullable CompositeBuffer partialBuffer, + BufferRecycler bufferRecycler, + List<Buffer> readBuffers) { + checkState(reusedHeaderBuffer.position() == 0); + checkState(partialBuffer == null || partialBuffer.missingLength() > 0); + + NetworkBuffer buffer = new NetworkBuffer(memorySegment, bufferRecycler); + buffer.setSize(byteBuffer.remaining()); + + try { + int numSlicedBytes = 0; + if (partialBuffer != null) { + // If there is a previous small partial buffer, the current read operation should + // read additional data and combine it with the existing partial to construct a new + // complete buffer + buffer.retainBuffer(); + int position = byteBuffer.position() + partialBuffer.missingLength(); + int numPartialBytes = partialBuffer.missingLength(); + partialBuffer.addPartialBuffer( + buffer.readOnlySlice(byteBuffer.position(), numPartialBytes)); + numSlicedBytes += numPartialBytes; + byteBuffer.position(position); + readBuffers.add(partialBuffer); + } + + partialBuffer = null; + while (byteBuffer.hasRemaining()) { + // Parse the small buffer's header + BufferHeader header = parseBufferHeader(byteBuffer); + if (header == null) { + // If the remaining data length in the buffer is not enough to construct a new + // complete buffer header, drop it directly. + break; + } else { + numSlicedBytes += HEADER_LENGTH; + } + + if (header.getLength() <= byteBuffer.remaining()) { + // The remaining data length in the buffer is enough to generate a new small + // sliced network buffer. The small sliced buffer is not a partial buffer, we + // should read the slice of the buffer directly + buffer.retainBuffer(); + ReadOnlySlicedNetworkBuffer slicedBuffer = + buffer.readOnlySlice(byteBuffer.position(), header.getLength()); + slicedBuffer.setDataType(header.getDataType()); + slicedBuffer.setCompressed(header.isCompressed()); + byteBuffer.position(byteBuffer.position() + header.getLength()); + numSlicedBytes += header.getLength(); + readBuffers.add(slicedBuffer); + } else { + // The remaining data length in the buffer is smaller than the actual length of + // the buffer, so we should generate a new partial buffer, allowing for + // generating a new complete buffer during the next read operation + buffer.retainBuffer(); + int numPartialBytes = byteBuffer.remaining(); + numSlicedBytes += numPartialBytes; + partialBuffer = new CompositeBuffer(header); + partialBuffer.addPartialBuffer( + buffer.readOnlySlice(byteBuffer.position(), numPartialBytes)); + readBuffers.add(partialBuffer); + break; + } } - return Optional.of(bufferOffsetCache); + return Tuple2.of(numSlicedBytes, partialBufferReadBytes(partialBuffer)); + } catch (Throwable throwable) { + LOG.error("Failed to slice the read buffer {}.", byteBuffer, throwable); + throw throwable; + } finally { + buffer.recycleBuffer(); } } /** - * The {@link BufferOffsetCache} represents the file offset cache for a buffer index. Each cache - * includes file offset of the buffer index, the region containing the buffer index and next - * buffer index to consume. + * Return a tuple of the start and end file offset, or return null if the buffer is not found in + * the data index. */ - private class BufferOffsetCache { + @Nullable + private Tuple2<Long, Long> getReadStartAndEndOffset( + TieredStorageSubpartitionId subpartitionId, + int bufferIndex, + @Nullable ReadProgress currentReadProgress, + @Nullable CompositeBuffer partialBuffer) { + ProducerMergedPartitionFile.ProducerMergedReadProgress readProgress = + convertToCurrentReadProgress(currentReadProgress); + long readStartOffset; + long readEndOffset; + if (readProgress == null + || readProgress.getCurrentBufferOffset() == readProgress.getEndOfRegionOffset()) { + Optional<ProducerMergedPartitionFileIndex.FixedSizeRegion> regionOpt = + dataIndex.getRegion(subpartitionId, bufferIndex); + if (!regionOpt.isPresent()) { + return null; + } + readStartOffset = regionOpt.get().getRegionStartOffset(); + readEndOffset = regionOpt.get().getRegionEndOffset(); + } else { + readStartOffset = + readProgress.getCurrentBufferOffset() + partialBufferReadBytes(partialBuffer); + readEndOffset = readProgress.getEndOfRegionOffset(); + } - private final ProducerMergedPartitionFileIndex.FixedSizeRegion region; + checkState(readStartOffset <= readEndOffset); + return Tuple2.of(readStartOffset, readEndOffset); + } - private long fileOffset; + private static ReadBufferResult getReadBufferResult( + List<Buffer> readBuffers, + long readStartOffset, + long readEndOffset, + int numBytesToRead, + int numBytesRealRead, + int numBytesReadPartialBuffer) { + boolean shouldContinueRead = readStartOffset + numBytesRealRead < readEndOffset; + ProducerMergedPartitionFile.ProducerMergedReadProgress readProgress = + new ProducerMergedPartitionFile.ProducerMergedReadProgress( + readStartOffset + numBytesRealRead - numBytesReadPartialBuffer, + readEndOffset); + checkState( + numBytesRealRead <= numBytesToRead + && numBytesToRead - numBytesRealRead < HEADER_LENGTH); - private int nextBufferIndex; + return new ReadBufferResult(readBuffers, shouldContinueRead, readProgress); + } - private BufferOffsetCache( - int bufferIndex, ProducerMergedPartitionFileIndex.FixedSizeRegion region) { - this.nextBufferIndex = bufferIndex; - this.region = region; - moveFileOffsetToBuffer(bufferIndex); + private void readFileDataToBuffer( + MemorySegment memorySegment, BufferRecycler recycler, ByteBuffer byteBuffer) + throws IOException { + try { + BufferReaderWriterUtil.readByteBufferFully(fileChannel, byteBuffer); + byteBuffer.flip(); + } catch (Throwable throwable) { + recycler.recycle(memorySegment); + throw throwable; } + } - /** - * Get the file offset. - * - * @return the file offset. - */ - private long getFileOffset() { - return fileOffset; - } + private static int partialBufferReadBytes(@Nullable CompositeBuffer partialBuffer) { Review Comment: ```suggestion private static int getPartialBufferReadBytes(@Nullable CompositeBuffer partialBuffer) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org