GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/5276
[FLINK-8371][network] always recycle Buffers when releasing SpillableSubpartition ## What is the purpose of the change This is a rebase of #5261 for the release-1.4 branch. Please refer to the original PR for details. Please also note that this PR includes #5275. You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-8371-1.4 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5276.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5276 ---- commit 2c667dea8e711fc928a37838c9f6cd5f6a9f7685 Author: Nico Kruber <nico@...> Date: 2017-11-23T12:09:37Z [hotfix] only update buffer statistics in SpillableSubpartition#add() if successful commit 9398dafae6918488582bc89c1ab73e3e2bd5ea16 Author: Nico Kruber <nico@...> Date: 2017-11-23T13:59:18Z [hotfix] add some more buffer recycling checks in SpillableSubpartitionTest commit 074bfbb82c4c18acf9ba383473fdefa89e1bf095 Author: Nico Kruber <nico@...> Date: 2017-11-23T13:58:21Z [FLINK-7499][io] also let AsynchronousBufferFileWriter#writeBlock() recycle the buffer in case of failures This fixes a double-recycle in SpillableSubpartitionView and also makes sure that even if adding the (asynchronous) write operation fails, the buffer is properly freed in code that did not perform this cleanup. It avoids code duplication of this cleanup and it is also more consistent to take over responsibility of the given buffer even if an exception is thrown. [FLINK-7499][io] complete the idiom of ResultSubpartition#add() taking over ownership of the buffer The buffer will now always be released once and at the right time and the caller must not worry about the buffer release if a called function threw an exception. commit be12c1a8169b9d4d3cfb6c39675273ffc953531a Author: Nico Kruber <nico@...> Date: 2018-01-05T17:18:35Z [FLINK-8371][network] always recycle Buffers when releasing SpillableSubpartition There were places where Buffer instances were not released upon SpillableSubpartition#release() with a view attached to a non-spilled subpartition: 1) SpillableSubpartition#buffer: SpillableSubpartition#release() delegates the recycling to the view, but SpillableSubpartitionView does not clean up the 'buffers' queue (the recycling was only done by the subpartition if there was no view). 2) SpillableSubpartitionView#nextBuffer: If this field is populated when the subpartition is released, it will neither be given out in subsequent SpillableSubpartitionView#getNextBuffer() calls (there was a short path returning 'null' here), nor was it recycled -> similarly to the PipelinesSubpartition implementation, make SpillableSubpartition#release() always clean up and recycle the buffers -> recycle SpillableSubpartitionView#nextBuffer in SpillableSubpartitionView#releaseAllResources() ---- ---