[ 
https://issues.apache.org/jira/browse/FLINK-9755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16538259#comment-16538259
 ] 

ASF GitHub Bot commented on FLINK-9755:
---------------------------------------

Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6272#discussion_r201260221
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
    @@ -360,32 +360,45 @@ public boolean notifyBufferAvailable(Buffer buffer) {
                        return false;
                }
     
    -           boolean needMoreBuffers = false;
    -           synchronized (bufferQueue) {
    -                   checkState(isWaitingForFloatingBuffers, "This channel 
should be waiting for floating buffers.");
    +           boolean recycleBuffer = true;
    +           try {
    +                   boolean needMoreBuffers = false;
    +                   synchronized (bufferQueue) {
    +                           checkState(isWaitingForFloatingBuffers,
    +                                   "This channel should be waiting for 
floating buffers.");
    +
    +                           // Important: double check the isReleased state 
inside synchronized block, so there is no
    +                           // race condition when notifyBufferAvailable 
and releaseAllResources running in parallel.
    +                           if (isReleased.get() ||
    +                                   bufferQueue.getAvailableBufferSize() >= 
numRequiredBuffers) {
    +                                   isWaitingForFloatingBuffers = false;
    +                                   buffer.recycleBuffer();
    +                                   return false;
    +                           }
     
    -                   // Important: double check the isReleased state inside 
synchronized block, so there is no
    -                   // race condition when notifyBufferAvailable and 
releaseAllResources running in parallel.
    -                   if (isReleased.get() || 
bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
    -                           isWaitingForFloatingBuffers = false;
    -                           buffer.recycleBuffer();
    -                           return false;
    -                   }
    +                           // note: this call may fail, for better 
cleanup, increase the counter first
    +                           if (unannouncedCredit.getAndAdd(1) == 0) {
    +                                   notifyCreditAvailable();
    --- End diff --
    
    yes, this one is a tricky one:
    Logically it should be: first adding the floating buffer, then enqueuing 
the channel. However, `notifyCreditAvailable()` is outside our control and 
could potentially fail. If it fails, we have two options:
    1) revert any change done by `notifyBufferAvailable()` and freeing the 
buffer so that it can be directly re-used by `LocalBufferPool`
    
    2) keep everything done by `notifyBufferAvailable()` and rely on 
`setError()` to clean up eventually - this means we should not recycle the 
buffer if it was already added to the `bufferQueue`.
    
    Giving it a second thought, it may indeed be better to go for solution 2 
and keep the logical execution as you proposed (infact, now I can't really 
think of a reason why I did not do that). We should be able to steer the 
recycling via the `recycleBuffer` as is already done in other cases.


> Exceptions in RemoteInputChannel#notifyBufferAvailable() are not propagated 
> to the responsible thread
> -----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-9755
>                 URL: https://issues.apache.org/jira/browse/FLINK-9755
>             Project: Flink
>          Issue Type: Bug
>          Components: Network
>    Affects Versions: 1.5.0
>            Reporter: Nico Kruber
>            Assignee: Nico Kruber
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.5.2, 1.6.0
>
>
> The credit-based flow control implementation of 
> RemoteInputChannel#notifyBufferAvailable() does not forward errors (like the 
> {{IllegalStateException}}) to the thread that is being notified. The calling 
> code at {{LocalBufferPool#recycle}}, however, relies on the callback 
> forwarding errors and completely ignores any failures.
> Therefore, we could end up with a program waiting forever for the callback 
> and not even a failure message in the logs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to