Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/6257#discussion_r200252253 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java --- @@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean isBlocking) throws Interrupte @Override public void recycle(MemorySegment segment) { + BufferListener listener; synchronized (availableMemorySegments) { if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) { returnMemorySegment(segment); + return; } else { - BufferListener listener = registeredListeners.poll(); + listener = registeredListeners.poll(); if (listener == null) { availableMemorySegments.add(segment); availableMemorySegments.notify(); + return; } - else { - try { - boolean needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this)); - if (needMoreBuffers) { - registeredListeners.add(listener); - } + } + } + + // We do not know which locks have been acquired before the recycle() or are needed in the + // notification and which other threads also access them. + // -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676) + boolean success = false; + boolean needMoreBuffers = false; + try { + needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this)); + success = true; + } catch (Throwable ignored) { + // handled below, under the lock + } + + if (!success || needMoreBuffers) { + synchronized (availableMemorySegments) { + if (isDestroyed) { + // cleanup tasks how they would have been done if we only had one synchronized block + if (needMoreBuffers) { + listener.notifyBufferDestroyed(); } - catch (Throwable ignored) { - availableMemorySegments.add(segment); - availableMemorySegments.notify(); --- End diff -- I am wondering whether we should rethrow this exception under below handling in the end. For example: During `RemoteInputChannel#notifyBufferAvailable`, if the tag of `isWaitingForFloatingBuffers` is not consistent, we should throw this exception to trigger failover, otherwise we can not find the potential bug.
---