Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/6272#discussion_r201969455 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java --- @@ -277,37 +277,17 @@ public void recycle(MemorySegment segment) { // We do not know which locks have been acquired before the recycle() or are needed in the // notification and which other threads also access them. // -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676) - boolean success = false; - boolean needMoreBuffers = false; - try { - needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this)); - success = true; - } catch (Throwable ignored) { - // handled below, under the lock - } + // Note that in case of any exceptions notifyBufferAvailable() should recycle the buffer and + // therefore end up in this method again. + needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this)); - if (!success || needMoreBuffers) { + if (needMoreBuffers) { synchronized (availableMemorySegments) { if (isDestroyed) { // cleanup tasks how they would have been done if we only had one synchronized block - if (needMoreBuffers) { - listener.notifyBufferDestroyed(); - } - if (!success) { - returnMemorySegment(segment); - } + listener.notifyBufferDestroyed(); } else { - if (needMoreBuffers) { - registeredListeners.add(listener); - } - if (!success) { - if (numberOfRequestedMemorySegments > currentPoolSize) { - returnMemorySegment(segment); - } else { - availableMemorySegments.add(segment); - availableMemorySegments.notify(); - } - } + registeredListeners.add(listener); --- End diff -- that's how it always was: a) if the exception was caught in the `notifyBufferAvailable()` handler, `needMoreBuffers` is `false` and we expect the error handling in that method to fail the logic that the thread waiting on the listener is relying on - otherwise this thread's error handling needs to re-register. b) if `notifyBufferAvailable()` throws (this is not allowed anymore!), previous logic also did not re-add the listener
---