[ https://issues.apache.org/jira/browse/FLINK-9755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16538213#comment-16538213 ]
ASF GitHub Bot commented on FLINK-9755: --------------------------------------- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/6272#discussion_r201242662 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -360,32 +360,45 @@ public boolean notifyBufferAvailable(Buffer buffer) { return false; } - boolean needMoreBuffers = false; - synchronized (bufferQueue) { - checkState(isWaitingForFloatingBuffers, "This channel should be waiting for floating buffers."); + boolean recycleBuffer = true; + try { + boolean needMoreBuffers = false; + synchronized (bufferQueue) { + checkState(isWaitingForFloatingBuffers, + "This channel should be waiting for floating buffers."); + + // Important: double check the isReleased state inside synchronized block, so there is no + // race condition when notifyBufferAvailable and releaseAllResources running in parallel. + if (isReleased.get() || + bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) { + isWaitingForFloatingBuffers = false; + buffer.recycleBuffer(); + return false; + } - // Important: double check the isReleased state inside synchronized block, so there is no - // race condition when notifyBufferAvailable and releaseAllResources running in parallel. - if (isReleased.get() || bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) { - isWaitingForFloatingBuffers = false; - buffer.recycleBuffer(); - return false; - } + // note: this call may fail, for better cleanup, increase the counter first + if (unannouncedCredit.getAndAdd(1) == 0) { + notifyCreditAvailable(); --- End diff -- From failure point, `notifyCreditAvailable` should be called before `bufferQueue.addFloatingBuffer`. But from another point, it seems more strict in logic to confirm buffer ready before announcing credit, otherwise the channel may receive new data before queuing this floating buffer, although it can hardly happen based on current implementation. Another concern is that `notifyCreditAvailable` itself is thread safe. But considering handling failure easily, we place it under `synchronized` part. Currently the process of `notifyCreditAvailable` is very lightweight, so the cost can be ignored. Maybe the above two concerns are unnecessary, and I can accept the current modifications. > Exceptions in RemoteInputChannel#notifyBufferAvailable() are not propagated > to the responsible thread > ----------------------------------------------------------------------------------------------------- > > Key: FLINK-9755 > URL: https://issues.apache.org/jira/browse/FLINK-9755 > Project: Flink > Issue Type: Bug > Components: Network > Affects Versions: 1.5.0 > Reporter: Nico Kruber > Assignee: Nico Kruber > Priority: Critical > Labels: pull-request-available > Fix For: 1.5.2, 1.6.0 > > > The credit-based flow control implementation of > RemoteInputChannel#notifyBufferAvailable() does not forward errors (like the > {{IllegalStateException}}) to the thread that is being notified. The calling > code at {{LocalBufferPool#recycle}}, however, relies on the callback > forwarding errors and completely ignores any failures. > Therefore, we could end up with a program waiting forever for the callback > and not even a failure message in the logs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)