curcur commented on a change in pull request #13614: URL: https://github.com/apache/flink/pull/13614#discussion_r504670777
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java ########## @@ -108,6 +109,30 @@ public Buffer build() { return slice.retainBuffer(); } + /** + * @param bytesToSkip number of bytes to skip from currentReaderPosition + * @return sliced {@link Buffer} containing the not yet consumed data, Returned {@link Buffer} shares the reference + * counter with the parent {@link BufferConsumer} - in order to recycle memory both of them must be recycled/closed. + */ + Buffer skipBuild(int bytesToSkip) { + writerPosition.update(); + int cachedWriterPosition = writerPosition.getCached(); + Buffer slice; + + int bytesReadable = cachedWriterPosition - currentReaderPosition; + checkState(bytesToSkip <= bytesReadable, "bytes to skip beyond readable range"); + + if (bytesToSkip < buffer.getMaxCapacity()) { + slice = buffer.readOnlySlice(currentReaderPosition + bytesToSkip, bytesReadable - bytesToSkip); + } else { + // return an empty buffer if beyond buffer max capacity + slice = buffer.readOnlySlice(currentReaderPosition, 0); + } Review comment: You are right, `bytesToSkip > buffer.getMaxCapacity()` is not possible. **However**, the problem is if (bytesToSkip == buffer.getMaxCapacity()), which is the case when a long record occupy the entire buffer, `currentReaderPosition + bytesToSkip` euqals to `buffer.getMaxCapacity()`, which means I would read from `buffer.getMaxCapacity()`. I guess I would get an out-of-range exception if I do that LOL ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org