Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2495 @StephanEwen Just trying to explain what my intention was. This is particularly to address the sorters. Solution We could create memory allocators inside the batch task corresponding to every input. So every iteration task will have a memory allocator/creator corresponding to each input. It would create an array of memory segments that are required for the sorters corresponding to each input and when the task keeps looping in an iterative fashion we use the same set of memory segments created by this task. When we receive a termination event only then we do release the segments created by this task. This would ensure that we do create allocate memory per task and that is used through the life cycle of the iterative task. We change the Merge sorters (CombiningUnilateralSortMerger and UnilateralSortMerger) such that we pass the MemoryAllocator to it. When the sorters start doing the sorting, writing and use large records (if any) we pull in the memory segments allocated in the memory allocator. For iterative tasks, where we release the segments we just need to put back the segments to the memory allocator instead of releasing it back to the memory manager. When the task receives termination call only then we forcefully close the allocators so that all the created segments are released back to the memory manager. So even if preallocation of memory is set to true I think this would work and we wonât be requesting new segments from the MemoryManagerâs pool and instead use the segments that were created initially for the first iteration. For a normal non-iterative tasks we know that the allocators are created for non-iterative tasks. We have a Boolean to indicate if it is an iterative task or not. Based on this flag, in the place where we try to release the segments we can decide if to release it back to the memory manager or put back to the memory allocator only (in case of iterative tasks). Pls note that I was able to fix the failed test cases that @ggevay pointed out. But I have not updated the PR. I can wait for your feedback and thoughts and then proceed with both the PRs - this and #2510 . Points to note: Not sure whether this aligns with Steven's future vision of memory management. Impact on Streaming Iterative tasks Will the amount of memory segments needed for this task be dynamically changed? If so the above mechanism cannot work.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---