zhijiangW commented on a change in pull request #11687: URL: https://github.com/apache/flink/pull/11687#discussion_r422054006
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java ########## @@ -177,17 +178,18 @@ void sendTaskEvent(TaskEvent event) { @Override boolean isReleased() { - return isReleased; + return isReleased.get(); } void releaseAllResources() throws IOException { - ArrayDeque<Buffer> releasedBuffers = new ArrayDeque<>(); - synchronized (receivedBuffers) { - releasedBuffers.addAll(receivedBuffers); - receivedBuffers.clear(); - isReleased = true; + if (isReleased.compareAndSet(false, true)) { Review comment: yeah, we can also take the way of adding the `synchronized (receivedBuffers)` for the method of `RecoveredInputGate#isReleased()` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org