[ 
https://issues.apache.org/jira/browse/FLINK-4094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15399085#comment-15399085
 ] 

ramkrishna.s.vasudevan commented on FLINK-4094:
-----------------------------------------------

[~mxm]
Thanks for the comment. My comment was bit vague as I  added them while on 
travel.
Let me try to explain based on what I see in code. If am missing something or 
wrong, pls do correct me.
The MemoryManager manages both Heap and offheap memory segment. 
{code}
                @Override
                HybridMemorySegment allocateNewSegment(Object owner) {
                        ByteBuffer memory = 
ByteBuffer.allocateDirect(segmentSize);
                        return 
HybridMemorySegment.FACTORY.wrapPooledOffHeapMemory(memory, owner);
                }

                @Override
                HybridMemorySegment requestSegmentFromPool(Object owner) {
                        ByteBuffer buf = availableMemory.remove();
                        return 
HybridMemorySegment.FACTORY.wrapPooledOffHeapMemory(buf, owner);
                }

                @Override
                void returnSegmentToPool(MemorySegment segment) {
                        if (segment.getClass() == HybridMemorySegment.class) {
                                HybridMemorySegment hybridSegment = 
(HybridMemorySegment) segment;
                                ByteBuffer buf = 
hybridSegment.getOffHeapBuffer();
                                availableMemory.add(buf);
                                hybridSegment.free();
                        }
                        else {
                                throw new IllegalArgumentException("Memory 
segment is not a " + HeapMemorySegment.class.getSimpleName());
                        }
                }
{code}
If you see the usage of the above APIs
{code}
                        if (isPreAllocated) {
                                for (int i = numPages; i > 0; i--) {
                                        MemorySegment segment = 
memoryPool.requestSegmentFromPool(owner);
                                        target.add(segment);
                                        segmentsForOwner.add(segment);
                                }
                        }
                        else {
                                for (int i = numPages; i > 0; i--) {
                                        MemorySegment segment = 
memoryPool.allocateNewSegment(owner);
                                        target.add(segment);
                                        segmentsForOwner.add(segment);
                                }
                                numNonAllocatedPages -= numPages;
                        }
{code}
So if there is preAllocation enabled the memory buffer is requested from the 
pool or every time there is newsegment allocated.
Coming to the release of these buffers
{code}
if (isPreAllocated) {
                                        // release the memory in any case
                                        memoryPool.returnSegmentToPool(segment);
                                }
                                else {
                                        segment.free();
                                        numNonAllocatedPages++;
                                }
{code}
Again only if preAllocation is enabled we are returning to pool. Ya as you 
clearly pointed out it is just dynamic allocation that we do and on memory 
manager shutdown we clear the allocated buffers. But for offheap this will not 
be enough as the GC will not be able to garbage collect them unless the fullGC 
happens. 
I would rather say that it is better we do internal management of offheap 
buffers. We should create a pool from which the buffers are allocated and if 
the pool is of fixed size and we have requests for more buffers than the size 
of the pool we should allocate them onheap only. (if that is acceptable).

Currently the memory management pool is done by ArrayDeque. We only allow 
initialSize and I think it can grow beyond too. So for offheap buffers we 
should have a fixed size pool and as and when the demand grows we should 
allocate few buffers onheap and once the pool is again able to offer buffers we 
use them. 

bq.I don't think just disallowing preallocation:false is a good fix.
Yes. I agree. That is a hacky one.

> Off heap memory deallocation might not properly work
> ----------------------------------------------------
>
>                 Key: FLINK-4094
>                 URL: https://issues.apache.org/jira/browse/FLINK-4094
>             Project: Flink
>          Issue Type: Bug
>          Components: Local Runtime
>    Affects Versions: 1.1.0
>            Reporter: Till Rohrmann
>            Assignee: ramkrishna.s.vasudevan
>            Priority: Critical
>             Fix For: 1.1.0
>
>
> A user reported that off-heap memory is not properly deallocated when setting 
> {{taskmanager.memory.preallocate:false}} (per default) [1]. This can cause 
> the TaskManager process being killed by the OS.
> It should be possible to execute multiple batch jobs with preallocation 
> turned off. No longer used direct memory buffers should be properly garbage 
> collected so that the JVM process does not exceed it's maximum memory bounds.
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/offheap-memory-allocation-and-memory-leak-bug-td12154.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to