tillrohrmann commented on a change in pull request #14904:
URL: https://github.com/apache/flink/pull/14904#discussion_r575294320



##########
File path: 
flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java
##########
@@ -357,4 +385,9 @@ public final void put(int offset, ByteBuffer source, int 
numBytes) {
             }
         }
     }
+
+    @Override
+    public <T> T processAsByteBuffer(Function<ByteBuffer, T> processFunction) {
+        return 
Preconditions.checkNotNull(processFunction).apply(wrapInternal(0, size));
+    }

Review comment:
       Which calls to `wrap` cannot be replaced with this call?

##########
File path: 
flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java
##########
@@ -57,6 +59,10 @@
      */
     @Nullable private ByteBuffer offHeapBuffer;
 
+    @Nullable private final Runnable cleaner;
+
+    private final boolean allowWrap;

Review comment:
       It might not be super clear which segments are allowed to be wrapped and 
which not. Adding some explanation could help.

##########
File path: 
flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java
##########
@@ -357,4 +385,9 @@ public final void put(int offset, ByteBuffer source, int 
numBytes) {
             }
         }
     }
+
+    @Override
+    public <T> T processAsByteBuffer(Function<ByteBuffer, T> processFunction) {
+        return 
Preconditions.checkNotNull(processFunction).apply(wrapInternal(0, size));
+    }

Review comment:
       Do we know what the performance penalty for the additional `Function` is?

##########
File path: 
flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
##########
@@ -1386,4 +1387,17 @@ public final boolean equalTo(MemorySegment seg2, int 
offset1, int offset2, int l
     public byte[] getHeapMemory() {
         return heapMemory;
     }
+
+    /**
+     * Applies the given process function on a {@link ByteBuffer} that 
represents this entire
+     * segment.
+     *
+     * <p>Note: The {@link ByteBuffer} passed into the process function is 
temporary and could
+     * become invalid after the processing. Thus, the process function should 
not try to keep any
+     * reference of the {@link ByteBuffer}.
+     *
+     * @param processFunction to be applied to the segment as {@link 
ByteBuffer}.
+     * @return the value that the process function returns.
+     */
+    public abstract <T> T processAsByteBuffer(Function<ByteBuffer, T> 
processFunction);

Review comment:
       We could also add a `acceptAsByteBuffer(Consumer<ByteBuffer> consumer)` 
for the case where we don't want to return a value.

##########
File path: 
flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java
##########
@@ -94,26 +126,22 @@
     @Override
     public void free() {
         super.free();
+        if (cleaner != null) {
+            cleaner.run();
+        }

Review comment:
       If we had a `DirectMemorySegment`, then we could only allow `wrap` on 
this type, for example.

##########
File path: 
flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java
##########
@@ -94,26 +126,22 @@
     @Override
     public void free() {
         super.free();
+        if (cleaner != null) {
+            cleaner.run();
+        }

Review comment:
       Looking at the special casing of this class, why don't we introduce an 
`DirectMemorySegment` and a `UnsafeMemorySegment`? Also, why do we have the 
`HeapMemorySegment` and still allow the `HybridMemorySegment` to be used with 
heap memory?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to