[
https://issues.apache.org/jira/browse/FLINK-4094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15399085#comment-15399085
]
ramkrishna.s.vasudevan commented on FLINK-4094:
-----------------------------------------------
[~mxm]
Thanks for the comment. My comment was bit vague as I added them while on
travel.
Let me try to explain based on what I see in code. If am missing something or
wrong, pls do correct me.
The MemoryManager manages both Heap and offheap memory segment.
{code}
@Override
HybridMemorySegment allocateNewSegment(Object owner) {
ByteBuffer memory =
ByteBuffer.allocateDirect(segmentSize);
return
HybridMemorySegment.FACTORY.wrapPooledOffHeapMemory(memory, owner);
}
@Override
HybridMemorySegment requestSegmentFromPool(Object owner) {
ByteBuffer buf = availableMemory.remove();
return
HybridMemorySegment.FACTORY.wrapPooledOffHeapMemory(buf, owner);
}
@Override
void returnSegmentToPool(MemorySegment segment) {
if (segment.getClass() == HybridMemorySegment.class) {
HybridMemorySegment hybridSegment =
(HybridMemorySegment) segment;
ByteBuffer buf =
hybridSegment.getOffHeapBuffer();
availableMemory.add(buf);
hybridSegment.free();
}
else {
throw new IllegalArgumentException("Memory
segment is not a " + HeapMemorySegment.class.getSimpleName());
}
}
{code}
If you see the usage of the above APIs
{code}
if (isPreAllocated) {
for (int i = numPages; i > 0; i--) {
MemorySegment segment =
memoryPool.requestSegmentFromPool(owner);
target.add(segment);
segmentsForOwner.add(segment);
}
}
else {
for (int i = numPages; i > 0; i--) {
MemorySegment segment =
memoryPool.allocateNewSegment(owner);
target.add(segment);
segmentsForOwner.add(segment);
}
numNonAllocatedPages -= numPages;
}
{code}
So if there is preAllocation enabled the memory buffer is requested from the
pool or every time there is newsegment allocated.
Coming to the release of these buffers
{code}
if (isPreAllocated) {
// release the memory in any case
memoryPool.returnSegmentToPool(segment);
}
else {
segment.free();
numNonAllocatedPages++;
}
{code}
Again only if preAllocation is enabled we are returning to pool. Ya as you
clearly pointed out it is just dynamic allocation that we do and on memory
manager shutdown we clear the allocated buffers. But for offheap this will not
be enough as the GC will not be able to garbage collect them unless the fullGC
happens.
I would rather say that it is better we do internal management of offheap
buffers. We should create a pool from which the buffers are allocated and if
the pool is of fixed size and we have requests for more buffers than the size
of the pool we should allocate them onheap only. (if that is acceptable).
Currently the memory management pool is done by ArrayDeque. We only allow
initialSize and I think it can grow beyond too. So for offheap buffers we
should have a fixed size pool and as and when the demand grows we should
allocate few buffers onheap and once the pool is again able to offer buffers we
use them.
bq.I don't think just disallowing preallocation:false is a good fix.
Yes. I agree. That is a hacky one.
> Off heap memory deallocation might not properly work
> ----------------------------------------------------
>
> Key: FLINK-4094
> URL: https://issues.apache.org/jira/browse/FLINK-4094
> Project: Flink
> Issue Type: Bug
> Components: Local Runtime
> Affects Versions: 1.1.0
> Reporter: Till Rohrmann
> Assignee: ramkrishna.s.vasudevan
> Priority: Critical
> Fix For: 1.1.0
>
>
> A user reported that off-heap memory is not properly deallocated when setting
> {{taskmanager.memory.preallocate:false}} (per default) [1]. This can cause
> the TaskManager process being killed by the OS.
> It should be possible to execute multiple batch jobs with preallocation
> turned off. No longer used direct memory buffers should be properly garbage
> collected so that the JVM process does not exceed it's maximum memory bounds.
> [1]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/offheap-memory-allocation-and-memory-leak-bug-td12154.html
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)