[ https://issues.apache.org/jira/browse/FLINK-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633823#comment-16633823 ]
ASF GitHub Bot commented on FLINK-10339: ---------------------------------------- NicoK closed pull request #6762: [FLINK-10339][network] Use off-heap memory for SpillReadBufferPool URL: https://github.com/apache/flink/pull/6762 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java index c235999db91..48b9a20e9da 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java @@ -74,6 +74,19 @@ public static MemorySegment allocateUnpooledSegment(int size, Object owner) { return new HybridMemorySegment(new byte[size], owner); } + /** + * Allocates some unpooled off-heap memory and creates a new memory segment that + * represents that memory. + * + * @param size The size of the off-heap memory segment to allocate. + * @param owner The owner to associate with the off-heap memory segment. + * @return A new memory segment, backed by unpooled off-heap memory. + */ + public static MemorySegment allocateUnpooledOffHeapMemory(int size, Object owner) { + ByteBuffer memory = ByteBuffer.allocateDirect(size); + return wrapPooledOffHeapMemory(memory, owner); + } + /** * Creates a memory segment that wraps the given byte array. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java index a369ce5a5fb..1fddb612781 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java @@ -30,7 +30,6 @@ import javax.annotation.Nullable; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -89,8 +88,7 @@ public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) { try { for (int i = 0; i < numberOfSegmentsToAllocate; i++) { - ByteBuffer memory = ByteBuffer.allocateDirect(segmentSize); - availableMemorySegments.add(MemorySegmentFactory.wrapPooledOffHeapMemory(memory, null)); + availableMemorySegments.add(MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize, null)); } } catch (OutOfMemoryError err) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java index 2a6a71f05d6..f941e20846e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java @@ -257,7 +257,8 @@ public String toString() { synchronized (buffers) { for (int i = 0; i < numberOfBuffers; i++) { - buffers.add(new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(memorySegmentSize), this)); + buffers.add(new NetworkBuffer(MemorySegmentFactory.allocateUnpooledOffHeapMemory( + memorySegmentSize, null), this)); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java index c450880f98b..d1a304a4909 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java @@ -675,8 +675,7 @@ void clear() { @Override MemorySegment allocateNewSegment(Object owner) { - ByteBuffer memory = ByteBuffer.allocateDirect(segmentSize); - return MemorySegmentFactory.wrapPooledOffHeapMemory(memory, owner); + return MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize, owner); } @Override ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > SpillReadBufferPool cannot use off-heap memory > ---------------------------------------------- > > Key: FLINK-10339 > URL: https://issues.apache.org/jira/browse/FLINK-10339 > Project: Flink > Issue Type: Improvement > Components: Network > Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0 > Reporter: zhijiang > Assignee: zhijiang > Priority: Minor > Labels: pull-request-available > > Currently, the {{NetworkBufferPool}} always uses off-heap memory to reduce > memory copy from flink {{Buffer}} to netty internal {{ByteBuf}} during > transporting on sender side. > > But for {{SpillReadBufferPool}} in {{SpilledSubpartitionView}}, it still uses > heap memory for caching. We can make it as off-heap by default similar with > {{NetworkBufferPool}} or decide the type by the current parameter > {{taskmanager.memory.off-heap.}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)