jiangxin369 commented on code in PR #23957:
URL: https://github.com/apache/flink/pull/23957#discussion_r1444373791


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java:
##########
@@ -129,6 +129,7 @@ public boolean tryWrite(
         }
         if (finishedBuffer.isBuffer()) {
             memoryManager.transferBufferOwnership(bufferOwner, this, 
finishedBuffer);
+            memoryManager.ensureCapacity();

Review Comment:
   Yes, we should ensure the capacity before starting a new segment instead of 
transferring ownership. I've change the implementation, please take a look.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java:
##########
@@ -91,6 +94,15 @@ public class TieredStorageMemoryManagerImpl implements 
TieredStorageMemoryManage
      */
     private final Map<Object, Integer> numOwnerRequestedBuffers;
 
+    /**
+     * The queue that contains all available buffers. This field should be 
thread-safe because it
+     * can be touched both by the task thread and the netty thread.
+     */
+    private final BlockingQueue<MemorySegment> bufferQueue;

Review Comment:
   The `BlockingQueue` implementations are required to be thread-safe according 
to the Java API doc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to