akalash commented on a change in pull request #15885:
URL: https://github.com/apache/flink/pull/15885#discussion_r633578223



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
##########
@@ -153,7 +153,12 @@ public BufferRecycler getRecycler() {
     }
 
     public void recycle() {
-        recycler.recycle(memorySegment);
+        // If at least one consumer was created then they responsible for the 
memory recycling
+        // because BufferBuilder doesn't contain a references counter so it 
will be impossible to
+        // correctly recycle memory here.
+        if (!bufferConsumerCreated) {
+            recycler.recycle(memorySegment);
+        }

Review comment:
       Yes, it is still definitely a hack. In my opinion, the right solution is 
to avoid direct writing into `MemorySegment`  from `BufferBuilder`. It means we 
just should change the implementation of `BufferBuilder` in such a way that 
using `Buffer` instead of `MemorySegment`. As I understand now you don't have 
any objections about such a solution if the benchmark doesn't show any 
degradation?
   
   In any case, some answers to the questions:
   
   - The contract is simple - `BufferBuilder#recycle()` should be called always 
when `BufferBuilder` is not needed anymore. You don't need to think 
`BufferConsumer` was created or not.
   - In general, renaming `recycle()` to `close()` makes sense to me since 
`BufferBuilder` doesn't have `retain` method and ideally, should be closed 
after usage.(we can think about it when we will agree on a final solution)
   - There are a couple of problems still not resolved - writing to already 
released `memorySegment` or creating 'BufferConsumer' from already closed 
'BufferBuilder'. They both can be resolved by solution which we already 
discussed(using `Buffer` instead of `memorySegment` inside of `BufferBuilder`)

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
##########
@@ -120,38 +120,36 @@ public void checkpointStopped(long checkpointId) {
     }
 
     public void onRecoveredStateBuffer(Buffer buffer) {
-        boolean recycleBuffer = true;
         NetworkActionsLogger.traceRecover(
                 "InputChannelRecoveredStateHandler#recover",
                 buffer,
                 inputGate.getOwningTaskName(),
                 channelInfo);
-        try {
-            final boolean wasEmpty;
-            synchronized (receivedBuffers) {
-                // Similar to notifyBufferAvailable(), make sure that we never 
add a buffer
-                // after releaseAllResources() released all buffers from 
receivedBuffers.
-                if (isReleased) {
-                    wasEmpty = false;
-                } else {
-                    wasEmpty = receivedBuffers.isEmpty();
-                    receivedBuffers.add(buffer);
-                    recycleBuffer = false;
-                }
-            }
 
-            if (wasEmpty) {
-                notifyChannelNonEmpty();
-            }
-        } finally {
-            if (recycleBuffer) {
-                buffer.recycleBuffer();
+        final boolean wasEmpty;
+        synchronized (receivedBuffers) {
+            // Similar to notifyBufferAvailable(), make sure that we never add 
a buffer
+            // after releaseAllResources() released all buffers from 
receivedBuffers.
+            if (isReleased) {
+                wasEmpty = false;
+            } else {
+                wasEmpty = receivedBuffers.isEmpty();
+                receivedBuffers.add(buffer.retainBuffer());

Review comment:
       Just in case, a little spoiler - these changes are not a target of the 
current task so I can easily revert these changes.
   
   Of course, it is not ok, that there are inconsistent between different parts 
of the code. So I will change everything in the same manner when we agree with 
the solution(or will do nothing if we decide to leave everything as is). 
   
   According to the original question. As I see right now there is no clear 
pattern of usage retain/release. It is why I want to find something which will 
be understandable for everybody. In my opinion, the classic pattern of 
reference/resource counter looks like the following: 
   
   - if you request resources no matter what happens next you should release 
this resource. 
   - If you share a resource you should increase the counter(ideally, you are 
able to share only via a specific structure that does all work for you. ex. 
BufferList instead of List<Buffer> which retain buffer automatically)
   - If you passive consumer which receives resources from outside you should 
do nothing with the management of resources.
   
   ```
   void wrong() {
     Buffer buf = getOrCreate();//request resource without release
     ...
   }
   void right() {
     Buffer buf = getOrCreate();//request resource
     try {
       ...
     } finally {
      buf.release();
    }
   }
   ```
   
   ```
   void wrong(Buffer buf) {
     buf.relase();//how I can be sure that it is enough of counter.
     
     buf.retain();//retain without release, can be surprising for the caller of 
this method.
   }
   void right(Buffer buf) {
     buf.retain();//useless but possible
     try {
       ...
     } finally {
      buf.release();
    }
   }
   ```
   
   ```
   void wrong(Buffer buf) {
     Buffer buf = getOrCreate();//request resource
     list.add(buf);//too dangerous. can lead to leak after refactoring. but 
possible in extreme cases for good performance.
   }
   void right(Buffer buf) {
     Buffer buf = getOrCreate();//request resource
     try {
       list.add(buf.retain()); //it is better to have BufferList with automatic 
'retain' but this is also possible.
     } finally {
      buf.release();
    }
   }
   ```
   ```
   void right(Buffer buf) {
     list.add(buf.retain()); //it is better to have BufferList with automatic 
'retain' but this is also possible.
     
     buf.retain(); //again, it is better to have something more specific for 
sharing resources with other threads but this also works.
     executor.submit(()-> /* some work with buffer*/);
   }
   ```
   
   So let's discuss what do you think about it. And maybe you can provide your 
rules of using retain/release pattern which I didn't realize from the code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to