[ 
https://issues.apache.org/jira/browse/FLINK-9755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16540299#comment-16540299
 ] 

ASF GitHub Bot commented on FLINK-9755:
---------------------------------------

Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6272#discussion_r201744892
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
    @@ -277,37 +277,17 @@ public void recycle(MemorySegment segment) {
                // We do not know which locks have been acquired before the 
recycle() or are needed in the
                // notification and which other threads also access them.
                // -> call notifyBufferAvailable() outside of the synchronized 
block to avoid a deadlock (FLINK-9676)
    -           boolean success = false;
    -           boolean needMoreBuffers = false;
    -           try {
    -                   needMoreBuffers = listener.notifyBufferAvailable(new 
NetworkBuffer(segment, this));
    -                   success = true;
    -           } catch (Throwable ignored) {
    -                   // handled below, under the lock
    -           }
    +           // Note that in case of any exceptions notifyBufferAvailable() 
should recycle the buffer and
    +           // therefore end up in this method again.
    +           needMoreBuffers = listener.notifyBufferAvailable(new 
NetworkBuffer(segment, this));
     
    -           if (!success || needMoreBuffers) {
    +           if (needMoreBuffers) {
                        synchronized (availableMemorySegments) {
                                if (isDestroyed) {
                                        // cleanup tasks how they would have 
been done if we only had one synchronized block
    -                                   if (needMoreBuffers) {
    -                                           
listener.notifyBufferDestroyed();
    -                                   }
    -                                   if (!success) {
    -                                           returnMemorySegment(segment);
    -                                   }
    +                                   listener.notifyBufferDestroyed();
                                } else {
    -                                   if (needMoreBuffers) {
    -                                           
registeredListeners.add(listener);
    -                                   }
    -                                   if (!success) {
    -                                           if 
(numberOfRequestedMemorySegments > currentPoolSize) {
    -                                                   
returnMemorySegment(segment);
    -                                           } else {
    -                                                   
availableMemorySegments.add(segment);
    -                                                   
availableMemorySegments.notify();
    -                                           }
    -                                   }
    +                                   registeredListeners.add(listener);
    --- End diff --
    
    If there is an exception being thrown, this will not be called. Is that how 
it supposed to be?


> Exceptions in RemoteInputChannel#notifyBufferAvailable() are not propagated 
> to the responsible thread
> -----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-9755
>                 URL: https://issues.apache.org/jira/browse/FLINK-9755
>             Project: Flink
>          Issue Type: Bug
>          Components: Network
>    Affects Versions: 1.5.0
>            Reporter: Nico Kruber
>            Assignee: Nico Kruber
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.5.2, 1.6.0
>
>
> The credit-based flow control implementation of 
> RemoteInputChannel#notifyBufferAvailable() does not forward errors (like the 
> {{IllegalStateException}}) to the thread that is being notified. The calling 
> code at {{LocalBufferPool#recycle}}, however, relies on the callback 
> forwarding errors and completely ignores any failures.
> Therefore, we could end up with a program waiting forever for the callback 
> and not even a failure message in the logs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to