akalash commented on a change in pull request #15885: URL: https://github.com/apache/flink/pull/15885#discussion_r633578223
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java ########## @@ -153,7 +153,12 @@ public BufferRecycler getRecycler() { } public void recycle() { - recycler.recycle(memorySegment); + // If at least one consumer was created then they responsible for the memory recycling + // because BufferBuilder doesn't contain a references counter so it will be impossible to + // correctly recycle memory here. + if (!bufferConsumerCreated) { + recycler.recycle(memorySegment); + } Review comment: Yes, it is still definitely a hack. In my opinion, the right solution is to avoid direct writing into `MemorySegment` from `BufferBuilder`. It means we just should change the implementation of `BufferBuilder` in such a way that using `Buffer` instead of `MemorySegment`. As I understand now you don't have any objections about such a solution if the benchmark doesn't show any degradation? In any case, some answers to the questions: - The contract is simple - `BufferBuilder#recycle()` should be called always when `BufferBuilder` is not needed anymore. You don't need to think `BufferConsumer` was created or not. - In general, renaming `recycle()` to `close()` makes sense to me since `BufferBuilder` doesn't have `retain` method and ideally, should be closed after usage.(we can think about it when we will agree on a final solution) - There are a couple of problems still not resolved - writing to already released `memorySegment` or creating 'BufferConsumer' from already closed 'BufferBuilder'. They both can be resolved by solution which we already discussed(using `Buffer` instead of `memorySegment` inside of `BufferBuilder`) ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java ########## @@ -120,38 +120,36 @@ public void checkpointStopped(long checkpointId) { } public void onRecoveredStateBuffer(Buffer buffer) { - boolean recycleBuffer = true; NetworkActionsLogger.traceRecover( "InputChannelRecoveredStateHandler#recover", buffer, inputGate.getOwningTaskName(), channelInfo); - try { - final boolean wasEmpty; - synchronized (receivedBuffers) { - // Similar to notifyBufferAvailable(), make sure that we never add a buffer - // after releaseAllResources() released all buffers from receivedBuffers. - if (isReleased) { - wasEmpty = false; - } else { - wasEmpty = receivedBuffers.isEmpty(); - receivedBuffers.add(buffer); - recycleBuffer = false; - } - } - if (wasEmpty) { - notifyChannelNonEmpty(); - } - } finally { - if (recycleBuffer) { - buffer.recycleBuffer(); + final boolean wasEmpty; + synchronized (receivedBuffers) { + // Similar to notifyBufferAvailable(), make sure that we never add a buffer + // after releaseAllResources() released all buffers from receivedBuffers. + if (isReleased) { + wasEmpty = false; + } else { + wasEmpty = receivedBuffers.isEmpty(); + receivedBuffers.add(buffer.retainBuffer()); Review comment: Just in case, a little spoiler - these changes are not a target of the current task so I can easily revert these changes. Of course, it is not ok, that there are inconsistent between different parts of the code. So I will change everything in the same manner when we agree with the solution(or will do nothing if we decide to leave everything as is). According to the original question. As I see right now there is no clear pattern of usage retain/release. It is why I want to find something which will be understandable for everybody. In my opinion, the classic pattern of reference/resource counter looks like the following: - if you request resources no matter what happens next you should release this resource. - If you share a resource you should increase the counter(ideally, you are able to share only via a specific structure that does all work for you. ex. BufferList instead of List<Buffer> which retain buffer automatically) - If you passive consumer which receives resources from outside you should do nothing with the management of resources. ``` void wrong() { Buffer buf = getOrCreate();//request resource without release ... } void right() { Buffer buf = getOrCreate();//request resource try { ... } finally { buf.release(); } } ``` ``` void wrong(Buffer buf) { buf.relase();//how I can be sure that it is enough of counter. buf.retain();//retain without release, can be surprising for the caller of this method. } void right(Buffer buf) { buf.retain();//useless but possible try { ... } finally { buf.release(); } } ``` ``` void wrong(Buffer buf) { Buffer buf = getOrCreate();//request resource list.add(buf);//too dangerous. can lead to leak after refactoring. but possible in extreme cases for good performance. } void right(Buffer buf) { Buffer buf = getOrCreate();//request resource try { list.add(buf.retain()); //it is better to have BufferList with automatic 'retain' but this is also possible. } finally { buf.release(); } } ``` ``` void right(Buffer buf) { list.add(buf.retain()); //it is better to have BufferList with automatic 'retain' but this is also possible. buf.retain(); //again, it is better to have something more specific for sharing resources with other threads but this also works. executor.submit(()-> /* some work with buffer*/); } ``` So let's discuss what do you think about it. And maybe you can provide your rules of using retain/release pattern which I didn't realize from the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org