GitHub user NicoK opened a pull request:

    https://github.com/apache/flink/pull/5276

    [FLINK-8371][network] always recycle Buffers when releasing 
SpillableSubpartition

    ## What is the purpose of the change
    
    This is a rebase of #5261 for the release-1.4 branch. Please refer to the 
original PR for details.
    
    Please also note that this PR includes #5275.
      

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/NicoK/flink flink-8371-1.4

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5276.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5276
    
----
commit 2c667dea8e711fc928a37838c9f6cd5f6a9f7685
Author: Nico Kruber <nico@...>
Date:   2017-11-23T12:09:37Z

    [hotfix] only update buffer statistics in SpillableSubpartition#add() if 
successful

commit 9398dafae6918488582bc89c1ab73e3e2bd5ea16
Author: Nico Kruber <nico@...>
Date:   2017-11-23T13:59:18Z

    [hotfix] add some more buffer recycling checks in SpillableSubpartitionTest

commit 074bfbb82c4c18acf9ba383473fdefa89e1bf095
Author: Nico Kruber <nico@...>
Date:   2017-11-23T13:58:21Z

    [FLINK-7499][io] also let AsynchronousBufferFileWriter#writeBlock() recycle 
the buffer in case of failures
    
    This fixes a double-recycle in SpillableSubpartitionView and also makes sure
    that even if adding the (asynchronous) write operation fails, the buffer is
    properly freed in code that did not perform this cleanup. It avoids code
    duplication of this cleanup and it is also more consistent to take over
    responsibility of the given buffer even if an exception is thrown.
    
    [FLINK-7499][io] complete the idiom of ResultSubpartition#add() taking over 
ownership of the buffer
    
    The buffer will now always be released once and at the right time and the 
caller
    must not worry about the buffer release if a called function threw an 
exception.

commit be12c1a8169b9d4d3cfb6c39675273ffc953531a
Author: Nico Kruber <nico@...>
Date:   2018-01-05T17:18:35Z

    [FLINK-8371][network] always recycle Buffers when releasing 
SpillableSubpartition
    
    There were places where Buffer instances were not released upon
    SpillableSubpartition#release() with a view attached to a non-spilled
    subpartition:
    
    1) SpillableSubpartition#buffer:
      SpillableSubpartition#release() delegates the recycling to the view, but
      SpillableSubpartitionView does not clean up the 'buffers' queue (the
      recycling was only done by the subpartition if there was no view).
    2) SpillableSubpartitionView#nextBuffer:
      If this field is populated when the subpartition is released, it will 
neither
      be given out in subsequent SpillableSubpartitionView#getNextBuffer() calls
      (there was a short path returning 'null' here), nor was it recycled
    
    -> similarly to the PipelinesSubpartition implementation, make
       SpillableSubpartition#release() always clean up and recycle the buffers
    -> recycle SpillableSubpartitionView#nextBuffer in
       SpillableSubpartitionView#releaseAllResources()

----


---

Reply via email to