akalash commented on a change in pull request #15885: URL: https://github.com/apache/flink/pull/15885#discussion_r633628445
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java ########## @@ -120,38 +120,36 @@ public void checkpointStopped(long checkpointId) { } public void onRecoveredStateBuffer(Buffer buffer) { - boolean recycleBuffer = true; NetworkActionsLogger.traceRecover( "InputChannelRecoveredStateHandler#recover", buffer, inputGate.getOwningTaskName(), channelInfo); - try { - final boolean wasEmpty; - synchronized (receivedBuffers) { - // Similar to notifyBufferAvailable(), make sure that we never add a buffer - // after releaseAllResources() released all buffers from receivedBuffers. - if (isReleased) { - wasEmpty = false; - } else { - wasEmpty = receivedBuffers.isEmpty(); - receivedBuffers.add(buffer); - recycleBuffer = false; - } - } - if (wasEmpty) { - notifyChannelNonEmpty(); - } - } finally { - if (recycleBuffer) { - buffer.recycleBuffer(); + final boolean wasEmpty; + synchronized (receivedBuffers) { + // Similar to notifyBufferAvailable(), make sure that we never add a buffer + // after releaseAllResources() released all buffers from receivedBuffers. + if (isReleased) { + wasEmpty = false; + } else { + wasEmpty = receivedBuffers.isEmpty(); + receivedBuffers.add(buffer.retainBuffer()); Review comment: Just in case, a little spoiler - these changes are not a target of the current task so I can easily revert these changes. Of course, it is not ok, that there are inconsistent between different parts of the code. So I will change everything in the same manner when we agree with the solution(or will do nothing if we decide to leave everything as is). According to the original question. As I see right now there is no clear pattern of usage retain/release. It is why I want to find something which will be understandable for everybody. In my opinion, the classic pattern of reference/resource counter looks like the following: - if you request resources no matter what happens next you should release this resource. - If you share a resource you should increase the counter(ideally, you are able to share only via a specific structure that does all work for you. ex. BufferList instead of List<Buffer> which retain buffer automatically) - If you passive consumer which receives resources from outside you should do nothing with the management of resources. ``` void wrong() { Buffer buf = getOrCreate();//request resource without release ... } void right() { Buffer buf = getOrCreate();//request resource try { ... } finally { buf.release(); } } ``` ``` void wrong(Buffer buf) { buf.relase();//how I can be sure that it is enough of counter. buf.retain();//retain without release, can be surprising for the caller of this method. } void right(Buffer buf) { buf.retain();//useless but possible try { ... } finally { buf.release(); } } ``` ``` void wrong(Buffer buf) { Buffer buf = getOrCreate();//request resource list.add(buf);//too dangerous. can lead to leak after refactoring. but possible in extreme cases for good performance. } void right(Buffer buf) { Buffer buf = getOrCreate();//request resource try { list.add(buf.retain()); //it is better to have BufferList with automatic 'retain' but this is also possible. } finally { buf.release(); } } ``` ``` void right(Buffer buf) { list.add(buf.retain()); //it is better to have BufferList with automatic 'retain' but this is also possible. buf.retain(); //again, it is better to have something more specific for sharing resources with other threads but this also works. executor.submit(()-> /* some work with buffer*/); } ``` So let's discuss what do you think about it. And maybe you can provide your rules of using retain/release pattern which I didn't realize from the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org