[ https://issues.apache.org/jira/browse/FLINK-9676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533438#comment-16533438 ]
ASF GitHub Bot commented on FLINK-9676: --------------------------------------- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/6257#discussion_r200287730 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java --- @@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean isBlocking) throws Interrupte @Override public void recycle(MemorySegment segment) { + BufferListener listener; synchronized (availableMemorySegments) { if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) { returnMemorySegment(segment); + return; } else { - BufferListener listener = registeredListeners.poll(); + listener = registeredListeners.poll(); if (listener == null) { availableMemorySegments.add(segment); availableMemorySegments.notify(); + return; } - else { - try { - boolean needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this)); - if (needMoreBuffers) { - registeredListeners.add(listener); - } + } + } + + // We do not know which locks have been acquired before the recycle() or are needed in the + // notification and which other threads also access them. + // -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676) + boolean success = false; + boolean needMoreBuffers = false; + try { + needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this)); + success = true; + } catch (Throwable ignored) { + // handled below, under the lock + } + + if (!success || needMoreBuffers) { + synchronized (availableMemorySegments) { + if (isDestroyed) { + // cleanup tasks how they would have been done if we only had one synchronized block + if (needMoreBuffers) { + listener.notifyBufferDestroyed(); } - catch (Throwable ignored) { - availableMemorySegments.add(segment); - availableMemorySegments.notify(); --- End diff -- š > Deadlock during canceling task and recycling exclusive buffer > ------------------------------------------------------------- > > Key: FLINK-9676 > URL: https://issues.apache.org/jira/browse/FLINK-9676 > Project: Flink > Issue Type: Bug > Components: Network > Affects Versions: 1.5.0 > Reporter: zhijiang > Assignee: Nico Kruber > Priority: Critical > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > It may cause deadlock between task canceler thread and task thread. > The detail is as follows: > {{Task canceler thread -> IC1#releaseAllResources -> recycle floating buffers > ->Ā {color:#d04437}lock{color}(LocalBufferPool#availableMemorySegments) -> > IC2#notifyBufferAvailable}}Ā >Ā {color:#d04437}try to > lock{color}(IC2#bufferQueue) > {{Task thread -> IC2#recycle -> {color:#d04437}lock{color}(IC2#bufferQueue) > -> bufferQueue#addExclusiveBuffer}} -> {{floatingBuffer#recycleBuffer}} -> > {color:#d04437}try to lock{color}(LocalBufferPool#availableMemorySegments) > One solution isĀ that {{listener#notifyBufferAvailable}}Ā can beĀ calledĀ outside > the {{synchronized(availableMemorySegments) inĀ }}{{LocalBufferPool#recycle.}} > The existing RemoteInputChannelTest#testConcurrentOnSenderBacklogAndRecycle > can cover this case but the deadlock probability is very low, so this UT is > not stable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)