[ https://issues.apache.org/jira/browse/FLINK-9755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16540299#comment-16540299 ]
ASF GitHub Bot commented on FLINK-9755: --------------------------------------- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6272#discussion_r201744892 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java --- @@ -277,37 +277,17 @@ public void recycle(MemorySegment segment) { // We do not know which locks have been acquired before the recycle() or are needed in the // notification and which other threads also access them. // -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676) - boolean success = false; - boolean needMoreBuffers = false; - try { - needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this)); - success = true; - } catch (Throwable ignored) { - // handled below, under the lock - } + // Note that in case of any exceptions notifyBufferAvailable() should recycle the buffer and + // therefore end up in this method again. + needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this)); - if (!success || needMoreBuffers) { + if (needMoreBuffers) { synchronized (availableMemorySegments) { if (isDestroyed) { // cleanup tasks how they would have been done if we only had one synchronized block - if (needMoreBuffers) { - listener.notifyBufferDestroyed(); - } - if (!success) { - returnMemorySegment(segment); - } + listener.notifyBufferDestroyed(); } else { - if (needMoreBuffers) { - registeredListeners.add(listener); - } - if (!success) { - if (numberOfRequestedMemorySegments > currentPoolSize) { - returnMemorySegment(segment); - } else { - availableMemorySegments.add(segment); - availableMemorySegments.notify(); - } - } + registeredListeners.add(listener); --- End diff -- If there is an exception being thrown, this will not be called. Is that how it supposed to be? > Exceptions in RemoteInputChannel#notifyBufferAvailable() are not propagated > to the responsible thread > ----------------------------------------------------------------------------------------------------- > > Key: FLINK-9755 > URL: https://issues.apache.org/jira/browse/FLINK-9755 > Project: Flink > Issue Type: Bug > Components: Network > Affects Versions: 1.5.0 > Reporter: Nico Kruber > Assignee: Nico Kruber > Priority: Critical > Labels: pull-request-available > Fix For: 1.5.2, 1.6.0 > > > The credit-based flow control implementation of > RemoteInputChannel#notifyBufferAvailable() does not forward errors (like the > {{IllegalStateException}}) to the thread that is being notified. The calling > code at {{LocalBufferPool#recycle}}, however, relies on the callback > forwarding errors and completely ignores any failures. > Therefore, we could end up with a program waiting forever for the callback > and not even a failure message in the logs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)