tillrohrmann commented on a change in pull request #14904: URL: https://github.com/apache/flink/pull/14904#discussion_r575294320
########## File path: flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java ########## @@ -357,4 +385,9 @@ public final void put(int offset, ByteBuffer source, int numBytes) { } } } + + @Override + public <T> T processAsByteBuffer(Function<ByteBuffer, T> processFunction) { + return Preconditions.checkNotNull(processFunction).apply(wrapInternal(0, size)); + } Review comment: Which calls to `wrap` cannot be replaced with this call? ########## File path: flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java ########## @@ -57,6 +59,10 @@ */ @Nullable private ByteBuffer offHeapBuffer; + @Nullable private final Runnable cleaner; + + private final boolean allowWrap; Review comment: It might not be super clear which segments are allowed to be wrapped and which not. Adding some explanation could help. ########## File path: flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java ########## @@ -357,4 +385,9 @@ public final void put(int offset, ByteBuffer source, int numBytes) { } } } + + @Override + public <T> T processAsByteBuffer(Function<ByteBuffer, T> processFunction) { + return Preconditions.checkNotNull(processFunction).apply(wrapInternal(0, size)); + } Review comment: Do we know what the performance penalty for the additional `Function` is? ########## File path: flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java ########## @@ -1386,4 +1387,17 @@ public final boolean equalTo(MemorySegment seg2, int offset1, int offset2, int l public byte[] getHeapMemory() { return heapMemory; } + + /** + * Applies the given process function on a {@link ByteBuffer} that represents this entire + * segment. + * + * <p>Note: The {@link ByteBuffer} passed into the process function is temporary and could + * become invalid after the processing. Thus, the process function should not try to keep any + * reference of the {@link ByteBuffer}. + * + * @param processFunction to be applied to the segment as {@link ByteBuffer}. + * @return the value that the process function returns. + */ + public abstract <T> T processAsByteBuffer(Function<ByteBuffer, T> processFunction); Review comment: We could also add a `acceptAsByteBuffer(Consumer<ByteBuffer> consumer)` for the case where we don't want to return a value. ########## File path: flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java ########## @@ -94,26 +126,22 @@ @Override public void free() { super.free(); + if (cleaner != null) { + cleaner.run(); + } Review comment: If we had a `DirectMemorySegment`, then we could only allow `wrap` on this type, for example. ########## File path: flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java ########## @@ -94,26 +126,22 @@ @Override public void free() { super.free(); + if (cleaner != null) { + cleaner.run(); + } Review comment: Looking at the special casing of this class, why don't we introduce an `DirectMemorySegment` and a `UnsafeMemorySegment`? Also, why do we have the `HeapMemorySegment` and still allow the `HybridMemorySegment` to be used with heap memory? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org