GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/5261
[FLINK-8371][network] always recycle Buffers when releasing SpillableSubpartition ## What is the purpose of the change There were places where `Buffer` instances were not released upon `SpillableSubpartition#release()` with a view attached to a non-spilled subpartition: 1) `SpillableSubpartition#buffer`: `SpillableSubpartition#release()` delegates the recycling to the view, but `SpillableSubpartitionView` does not clean up the `buffers` queue (the recycling was only done by the subpartition if there was no view). 2) `SpillableSubpartitionView#nextBuffer`: If this field is populated when the subpartition is released, it will neither be given out in subsequent `SpillableSubpartitionView#getNextBuffer()` calls (there was a short path returning `null` here), nor was it recycled - Please refer to dataArtisans/flink#3. - This PR is based on #5260 . - It should probably be applied to Flink-1.4 as well. ## Brief change log - similarly to the `PipelinesSubpartition` implementation, make `SpillableSubpartition#release()` always clean up and recycle the buffers - recycle `SpillableSubpartitionView#nextBuffer` in `SpillableSubpartitionView#releaseAllResources()` ## Verifying this change This change added tests and can be verified as follows: - added tests for various scenarios releasing a spillable/spilled or pipelined subpartition ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-8371 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5261.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5261 ---- commit fada3022186670851a3956a961f490e6d86e2a53 Author: Nico Kruber <nico@...> Date: 2017-12-13T14:28:08Z [hotfix][checkstyle] only ignore checkstyle in existing packages under runtime.io.network - ignore runtime.io.(async|disk) - ignore runtime.io.network.(api|buffer|netty|partition|serialization|util) -> everything else will be checked against the ruleset - fix checkstyle errors in classes directly under runtime.io.network commit f8dff47a707c4e7572d02e072197927ec2ce2ef7 Author: Nico Kruber <nico@...> Date: 2017-12-14T16:30:19Z [FLINK-8252][benchmarks] convert network benchmarks to streaming benchmarks This allows us to use the output flushing interval as a parameter to evaluate, too. commit a4980ae85bac1dca9f8939e0dc3c8839991ed5e8 Author: Zhijiang <wangzhijiang999@...> Date: 2017-08-17T11:38:45Z [FLINK-7468][network] Implement sender backlog logic for credit-based commit 0da032b3e7b90da2cbee5ca6f051667add104ac6 Author: Piotr Nowojski <piotr.nowojski@...> Date: 2018-01-05T14:28:40Z [FLINK-8375][network] Remove unnecessary synchronization Synchronized blocks in ResultPartition could affect only: 1. totalNumberOfBuffers and totalNumberOfBytes counters 2. subpartition add(), finish() and release() calls. However: 1. counters were not used anywhere - they are removed by this commit 2a. add(), finish() and release() methods for PipelinedSubpartition were already threads safe 2b. add(), finish() and release() methods for SpillableSubpartition were made thread safe in this commit, by basically pushing synchronized section down one level. commit 2938610f996361e68dedefed6c247c3547cea331 Author: Nico Kruber <nico@...> Date: 2018-01-05T15:17:13Z [hotfix][tests] move assertions out of the finally block There was a potential for them to mask exceptions. commit f2dcc6ed3d5b5c136e4d899375ae85c3fe1e0a3e Author: Nico Kruber <nico@...> Date: 2018-01-05T17:18:35Z [FLINK-8371][network] always recycle Buffers when releasing SpillableSubpartition There were places where Buffer instances were not released upon SpillableSubpartition#release() with a view attached to a non-spilled subpartition: 1) SpillableSubpartition#buffer: SpillableSubpartition#release() delegates the recycling to the view, but SpillableSubpartitionView does not clean up the 'buffers' queue (the recycling was only done by the subpartition if there was no view). 2) SpillableSubpartitionView#nextBuffer: If this field is populated when the subpartition is released, it will neither be given out in subsequent SpillableSubpartitionView#getNextBuffer() calls (there was a short path returning 'null' here), nor was it recycled -> similarly to the PipelinesSubpartition implementation, make SpillableSubpartition#release() always clean up and recycle the buffers -> recycle SpillableSubpartitionView#nextBuffer in SpillableSubpartitionView#releaseAllResources() ---- ---