reswqa commented on code in PR #23957: URL: https://github.com/apache/flink/pull/23957#discussion_r1445651286
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java: ########## @@ -244,6 +251,58 @@ public void release() { ExceptionUtils.rethrow(e); } } + while (!bufferQueue.isEmpty()) { + MemorySegment segment = bufferQueue.poll(); + bufferPool.recycle(segment); + numRequestedBuffers.decrementAndGet(); + } + } + + /** + * Returns a memory segment from the buffer pool or null if the memory manager has requested all + * segments of the buffer pool. + */ + private MemorySegment requestBufferBlockingFromPool() { Review Comment: This should be marked as `Nullable` or return an `Optional<MemorySegment>`. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java: ########## @@ -173,49 +187,42 @@ public BufferBuilder requestBufferBlocking(Object owner) { reclaimBuffersIfNeeded(0); - CompletableFuture<Void> requestBufferFuture = new CompletableFuture<>(); - scheduleCheckRequestBufferFuture( - requestBufferFuture, INITIAL_REQUEST_BUFFER_TIMEOUT_FOR_RECLAIMING_MS); - MemorySegment memorySegment = bufferPool.requestMemorySegment(); - + MemorySegment memorySegment = bufferQueue.poll(); if (memorySegment == null) { - try { - hardBackpressureTimerGauge.markStart(); - memorySegment = bufferPool.requestMemorySegmentBlocking(); - hardBackpressureTimerGauge.markEnd(); - } catch (InterruptedException e) { - ExceptionUtils.rethrow(e); - } + memorySegment = requestBufferBlockingFromPool(); + } + if (memorySegment == null) { + memorySegment = requestBufferBlockingFromQueue(); Review Comment: ```suggestion memorySegment = checkNotNull(requestBufferBlockingFromQueue()); ``` ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java: ########## @@ -244,6 +251,58 @@ public void release() { ExceptionUtils.rethrow(e); } } + while (!bufferQueue.isEmpty()) { + MemorySegment segment = bufferQueue.poll(); + bufferPool.recycle(segment); + numRequestedBuffers.decrementAndGet(); + } + } + + /** + * Returns a memory segment from the buffer pool or null if the memory manager has requested all + * segments of the buffer pool. + */ Review Comment: This appears to be describing the `@return` of this method rather than java doc. ########## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionTest.java: ########## @@ -192,15 +188,34 @@ void testMetricsUpdateForBroadcastOnlyResultPartition() throws Exception { try (TieredResultPartition partition = createTieredStoreResultPartition(2, bufferPool, true)) { partition.broadcastRecord(ByteBuffer.allocate(bufferSize)); - IOMetrics ioMetrics = taskIOMetricGroup.createSnapshot(); - assertThat(ioMetrics.getResultPartitionBytes()).hasSize(1); - ResultPartitionBytes partitionBytes = - ioMetrics.getResultPartitionBytes().values().iterator().next(); - assertThat(partitionBytes.getSubpartitionBytes()) - .containsExactly(bufferSize, bufferSize); + verifySubpartitionBytes(bufferSize, bufferSize); } } + @Test + void testRequestBuffersAfterPoolSizeDecreased() throws IOException { + final int numBuffers = 15; + final int numRecords = 100; + BufferPool bufferPool = globalPool.createBufferPool(1, numBuffers); + TieredResultPartition resultPartition = + createTieredStoreResultPartitionWithStorageManager(1, bufferPool, false); + + // Emits some records to fill up all buffers of memory tier, these buffers would not be + // recycled until the subpartitionView is released manually. + for (int i = 0; i < numRecords; i++) { + resultPartition.emitRecord(ByteBuffer.allocate(NETWORK_BUFFER_SIZE), 0); + } + verifySubpartitionBytes(numRecords * NETWORK_BUFFER_SIZE); + + bufferPool.setNumBuffers(1); Review Comment: Better to check that there are no more buffers in the buffer pool before this. ########## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImplTest.java: ########## @@ -242,19 +173,49 @@ void testCanNotTransferOwnershipForEvent() throws IOException { .isInstanceOf(IllegalStateException.class); } + @Test + void testEnsureCapacity() throws IOException { + final int numBuffers = 5; + final int guaranteedReclaimableBuffers = 3; + + BufferPool bufferPool = globalPool.createBufferPool(numBuffers, numBuffers); + TieredStorageMemoryManagerImpl storageMemoryManager = + createStorageMemoryManager( + bufferPool, + Arrays.asList( + new TieredStorageMemorySpec( + new Object(), guaranteedReclaimableBuffers, true), + new TieredStorageMemorySpec(this, 0, false))); + assertThat(storageMemoryManager.ensureCapacity(0)).isTrue(); + assertThat(bufferPool.bestEffortGetNumOfUsedBuffers()) + .isEqualTo(guaranteedReclaimableBuffers); + + assertThat(storageMemoryManager.ensureCapacity(numBuffers - guaranteedReclaimableBuffers)) + .isTrue(); + assertThat(bufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(numBuffers); + + assertThat( + storageMemoryManager.ensureCapacity( + numBuffers - guaranteedReclaimableBuffers + 1)) Review Comment: IIUC, we have already requested all buffers of this pool. So why is there an elaborate value `numBuffers - guaranteedReclaimableBuffers + 1` instead of `1`? ########## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionTest.java: ########## @@ -192,15 +188,34 @@ void testMetricsUpdateForBroadcastOnlyResultPartition() throws Exception { try (TieredResultPartition partition = createTieredStoreResultPartition(2, bufferPool, true)) { partition.broadcastRecord(ByteBuffer.allocate(bufferSize)); - IOMetrics ioMetrics = taskIOMetricGroup.createSnapshot(); - assertThat(ioMetrics.getResultPartitionBytes()).hasSize(1); - ResultPartitionBytes partitionBytes = - ioMetrics.getResultPartitionBytes().values().iterator().next(); - assertThat(partitionBytes.getSubpartitionBytes()) - .containsExactly(bufferSize, bufferSize); + verifySubpartitionBytes(bufferSize, bufferSize); } } + @Test + void testRequestBuffersAfterPoolSizeDecreased() throws IOException { Review Comment: Will this test fail before this PR? ########## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionTest.java: ########## @@ -192,15 +188,34 @@ void testMetricsUpdateForBroadcastOnlyResultPartition() throws Exception { try (TieredResultPartition partition = createTieredStoreResultPartition(2, bufferPool, true)) { partition.broadcastRecord(ByteBuffer.allocate(bufferSize)); - IOMetrics ioMetrics = taskIOMetricGroup.createSnapshot(); - assertThat(ioMetrics.getResultPartitionBytes()).hasSize(1); - ResultPartitionBytes partitionBytes = - ioMetrics.getResultPartitionBytes().values().iterator().next(); - assertThat(partitionBytes.getSubpartitionBytes()) - .containsExactly(bufferSize, bufferSize); + verifySubpartitionBytes(bufferSize, bufferSize); } } + @Test + void testRequestBuffersAfterPoolSizeDecreased() throws IOException { + final int numBuffers = 15; + final int numRecords = 100; + BufferPool bufferPool = globalPool.createBufferPool(1, numBuffers); + TieredResultPartition resultPartition = + createTieredStoreResultPartitionWithStorageManager(1, bufferPool, false); + + // Emits some records to fill up all buffers of memory tier, these buffers would not be + // recycled until the subpartitionView is released manually. + for (int i = 0; i < numRecords; i++) { Review Comment: How about only write records utils the buffer pool has no available buffer. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java: ########## @@ -173,49 +187,42 @@ public BufferBuilder requestBufferBlocking(Object owner) { reclaimBuffersIfNeeded(0); - CompletableFuture<Void> requestBufferFuture = new CompletableFuture<>(); - scheduleCheckRequestBufferFuture( - requestBufferFuture, INITIAL_REQUEST_BUFFER_TIMEOUT_FOR_RECLAIMING_MS); - MemorySegment memorySegment = bufferPool.requestMemorySegment(); - + MemorySegment memorySegment = bufferQueue.poll(); if (memorySegment == null) { - try { - hardBackpressureTimerGauge.markStart(); - memorySegment = bufferPool.requestMemorySegmentBlocking(); - hardBackpressureTimerGauge.markEnd(); - } catch (InterruptedException e) { - ExceptionUtils.rethrow(e); - } + memorySegment = requestBufferBlockingFromPool(); + } + if (memorySegment == null) { + memorySegment = requestBufferBlockingFromQueue(); Review Comment: I believe we need a sanity check here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org