pnowojski commented on a change in pull request #11687: URL: https://github.com/apache/flink/pull/11687#discussion_r422058882
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java ########## @@ -177,17 +178,18 @@ void sendTaskEvent(TaskEvent event) { @Override boolean isReleased() { - return isReleased; + return isReleased.get(); } void releaseAllResources() throws IOException { - ArrayDeque<Buffer> releasedBuffers = new ArrayDeque<>(); - synchronized (receivedBuffers) { - releasedBuffers.addAll(receivedBuffers); - receivedBuffers.clear(); - isReleased = true; + if (isReleased.compareAndSet(false, true)) { Review comment: Agreed. But generally speaking closing/releasing logic is on my list for a larger refactor/clean up for a long time, especially because of those various nasty concurrency issues - I would see closing/releasing eventually moved to task thread/mailbox. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org