jiangxin369 commented on code in PR #23957: URL: https://github.com/apache/flink/pull/23957#discussion_r1444373791
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java: ########## @@ -129,6 +129,7 @@ public boolean tryWrite( } if (finishedBuffer.isBuffer()) { memoryManager.transferBufferOwnership(bufferOwner, this, finishedBuffer); + memoryManager.ensureCapacity(); Review Comment: Yes, we should ensure the capacity before starting a new segment instead of transferring ownership. I've change the implementation, please take a look. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java: ########## @@ -91,6 +94,15 @@ public class TieredStorageMemoryManagerImpl implements TieredStorageMemoryManage */ private final Map<Object, Integer> numOwnerRequestedBuffers; + /** + * The queue that contains all available buffers. This field should be thread-safe because it + * can be touched both by the task thread and the netty thread. + */ + private final BlockingQueue<MemorySegment> bufferQueue; Review Comment: The `BlockingQueue` implementations are required to be thread-safe according to the Java API doc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org