reswqa commented on code in PR #22084: URL: https://github.com/apache/flink/pull/22084#discussion_r1127489392
########## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java: ########## @@ -242,23 +243,129 @@ public void testRecycleAfterDestroy() { localBufferPool.lazyDestroy(); // All buffers have been requested, but can not be returned yet. - assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool()); + assertThat(getNumRequestedFromMemorySegmentPool()).isEqualTo(numBuffers); // Recycle should return buffers to memory segment pool for (Buffer buffer : requests) { buffer.recycleBuffer(); } } + @Test + void testDecreasePoolSize() throws Exception { + LocalBufferPool bufferPool = + new LocalBufferPool(networkBufferPool, 4, 10, 0, Integer.MAX_VALUE, 2); + Queue<MemorySegment> buffers = new LinkedList<>(); + + // set pool size to 5. + bufferPool.setNumBuffers(5); + assertThat(bufferPool.getNumBuffers()).isEqualTo(5); + + // request all buffer. + for (int i = 0; i < 5; i++) { + buffers.add(bufferPool.requestMemorySegmentBlocking()); + } + assertThat(bufferPool.isAvailable()).isFalse(); + + // request 1 overdraft buffers. + buffers.add(bufferPool.requestMemorySegmentBlocking()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); + assertThat(bufferPool.isAvailable()).isFalse(); + + // set pool size to 4. + bufferPool.setNumBuffers(4); + assertThat(bufferPool.getNumBuffers()).isEqualTo(4); + assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isZero(); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); + assertThat(bufferPool.isAvailable()).isFalse(); + buffers.add(bufferPool.requestMemorySegmentBlocking()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2); + assertThat(bufferPool.isAvailable()).isFalse(); + + // return all overdraft buffers. + bufferPool.recycle(buffers.poll()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isOne(); + assertThat(bufferPool.isAvailable()).isFalse(); + bufferPool.recycle(buffers.poll()); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isZero(); + assertThat(bufferPool.isAvailable()).isFalse(); + + // return the excess buffer. + bufferPool.recycle(buffers.poll()); + assertThat(bufferPool.isAvailable()).isFalse(); + // return non-excess buffers. + bufferPool.recycle(buffers.poll()); + assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne(); + assertThat(bufferPool.isAvailable()).isTrue(); + + while (!buffers.isEmpty()) { + bufferPool.recycle(buffers.poll()); + } + bufferPool.lazyDestroy(); + } + + @Test + void testIncreasePoolSize() throws Exception { + LocalBufferPool bufferPool = + new LocalBufferPool(networkBufferPool, 5, 100, 0, Integer.MAX_VALUE, 2); + List<MemorySegment> buffers = new ArrayList<>(); + + // set pool size to 5. + bufferPool.setNumBuffers(5); + assertThat(bufferPool.getNumBuffers()).isEqualTo(5); + + // request all buffer. + for (int i = 0; i < 5; i++) { + buffers.add(bufferPool.requestMemorySegmentBlocking()); + } + assertThat(bufferPool.isAvailable()).isFalse(); + + // request 2 overdraft buffers. + buffers.add(bufferPool.requestMemorySegmentBlocking()); + buffers.add(bufferPool.requestMemorySegmentBlocking()); + assertThat(bufferPool.requestMemorySegment()).isNull(); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2); + assertThat(bufferPool.isAvailable()).isFalse(); + + // set pool size to 10. + bufferPool.setNumBuffers(10); + assertThat(bufferPool.getNumBuffers()).isEqualTo(10); + assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne(); + assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments()).isEqualTo(2); + // available status will not be influenced by overdraft. + assertThat(bufferPool.isAvailable()).isTrue(); + buffers.add(bufferPool.requestMemorySegmentBlocking()); + assertThat(bufferPool.isAvailable()).isTrue(); Review Comment: Good catch !!! I think the second one is more better. If pool size becomes larger, some overdraft buffers need convert to requested buffers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org