TanYuxin-tyx commented on code in PR #23851: URL: https://github.com/apache/flink/pull/23851#discussion_r1472451043
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java: ########## @@ -37,8 +37,16 @@ public class TieredStorageConfiguration { private static final String DEFAULT_REMOTE_STORAGE_BASE_PATH = null; + private static final boolean DEFAULT_MEMORY_DECOUPLING_ENABLED = false; Review Comment: By default, the memory decoupling switch is false. When do we enable it? ########## flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleUtils.java: ########## @@ -39,6 +40,10 @@ */ public class NettyShuffleUtils { + // Temporarily declare the default value here, it would be moved to the configuration class + // later. Review Comment: We should use `/** */` instead here. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java: ########## @@ -335,19 +338,22 @@ private MemorySegment requestBufferBlockingFromPool() { } /** @return a memory segment from the internal buffer queue. */ - private MemorySegment requestBufferBlockingFromQueue() { + @Nullable + private MemorySegment requestBufferFromQueue() { CompletableFuture<Void> requestBufferFuture = new CompletableFuture<>(); scheduleCheckRequestBufferFuture( requestBufferFuture, INITIAL_REQUEST_BUFFER_TIMEOUT_FOR_RECLAIMING_MS); + hardBackpressureTimerGauge.markStart(); Review Comment: Marking the start and end in the loop can affect the real value of the `hardBackpressureTimerGauge`(e.g., `inpreviousMaxSingleMeasurement` in `TimerGauge`). The marking start can be moved to the location before starting the `while loop`, and the marking end can be moved to the location where the loop ends. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java: ########## @@ -132,16 +134,44 @@ private boolean shouldContinueRequest(BufferPool bufferPool) { } } - /** Requests exclusive buffers from the provider. */ - void requestExclusiveBuffers(int numExclusiveBuffers) throws IOException { - checkArgument(numExclusiveBuffers >= 0, "Num exclusive buffers must be non-negative."); - if (numExclusiveBuffers == 0) { - return; + private void resizeBufferQueue() { + SingleInputGate inputGate = inputChannel.inputGate; + int currentSize = inputGate.getBufferPool().getNumBuffers(); + int numRemoteChannels = + inputGate.getNumberOfInputChannels() - inputGate.getNumberOfLocalInputChannels(); + if (numRemoteChannels == 0) { + numExclusiveBuffers = 0; + } else if (currentSize > 1 && currentSize != bufferPoolSize) { + int targetExclusivePerChannel = + Math.min(initialCredit, (currentSize - 1) / numRemoteChannels); Review Comment: Why do we `currentSize - 1` here? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortBuffer.java: ########## @@ -184,6 +184,27 @@ public boolean append(ByteBuffer source, int targetSubpartition, Buffer.DataType return false; } + /** + * Try to release some unused memory segments. + * + * <p>Note that this class is not thread safe, so please make sure to call {@link + * #append(ByteBuffer source, int targetSubpartition, Buffer.DataType dataType)} and this method + * with lock acquired. + * + * @param numFreeSegments the number of segments to be released. + * @return true if released successfully, otherwise false. + */ + public boolean returnFreeSegments(int numFreeSegments) { + if (numFreeSegments < numGuaranteedBuffers - segments.size()) { + for (int i = 0; i < numFreeSegments; i++) { + bufferRecycler.recycle(freeSegments.poll()); Review Comment: I have a concern here. Directly polling buffer and recycling it may not be right. We should ensure that the left buffers are enough when reading buffers from the sort buffer. Only when the left buffers are enough for reading, we can recycle the buffers safely. (We can record the initial total number of the buffers and decide whether to recycle according to the left buffers.) ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java: ########## @@ -208,14 +238,21 @@ private int tryRequestBuffers() { */ @Override public void recycle(MemorySegment segment) { + @Nullable Buffer releasedFloatingBuffer = null; synchronized (bufferQueue) { + resizeBufferQueue(); try { + BufferPool bufferPool = inputChannel.inputGate.getBufferPool(); // Similar to notifyBufferAvailable(), make sure that we never add a buffer // after channel released all buffers via releaseAllResources(). if (inputChannel.isReleased()) { - globalPool.recycleUnpooledMemorySegments(Collections.singletonList(segment)); + bufferPool.recycle(segment); return; + } else if (inputChannel instanceof RecoveredInputChannel Review Comment: As we discussed, for the recovering process, the buffers should be recycled instead of added to the exclusive buffers because the number of exclusive buffers is calculated by the number of remote channels. If we add these buffers to the local recover input channel wrongly, the buffers may be used up. I'm not sure whether we need to describe the condition reason for `inputChannel instanceof RecoveredInputChannel` in more detail. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java: ########## @@ -275,11 +274,6 @@ protected int getNumberOfQueuedBuffers() { } public Buffer requestBufferBlocking() throws InterruptedException, IOException { - // not in setup to avoid assigning buffers unnecessarily if there is no state Review Comment: For the recovered input channel, do we need this change anymore if we set networkBuffersPerChannel to 0? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java: ########## @@ -132,16 +134,44 @@ private boolean shouldContinueRequest(BufferPool bufferPool) { } } - /** Requests exclusive buffers from the provider. */ - void requestExclusiveBuffers(int numExclusiveBuffers) throws IOException { - checkArgument(numExclusiveBuffers >= 0, "Num exclusive buffers must be non-negative."); - if (numExclusiveBuffers == 0) { - return; + private void resizeBufferQueue() { + SingleInputGate inputGate = inputChannel.inputGate; + int currentSize = inputGate.getBufferPool().getNumBuffers(); + int numRemoteChannels = + inputGate.getNumberOfInputChannels() - inputGate.getNumberOfLocalInputChannels(); + if (numRemoteChannels == 0) { + numExclusiveBuffers = 0; + } else if (currentSize > 1 && currentSize != bufferPoolSize) { + int targetExclusivePerChannel = Review Comment: is `targetExclusivePerChannel` useless? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java: ########## @@ -244,18 +246,8 @@ public int getMaxNonReclaimableBuffers(Object owner) { public boolean ensureCapacity(int numAdditionalBuffers) { checkIsInitialized(); - final int numRequestedByGuaranteedReclaimableOwners = Review Comment: Is this a bug fix? We'd better separate it as a hotfix commit. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java: ########## @@ -600,73 +605,133 @@ private void redistributeBuffers() { } // All buffers, which are not among the required ones - final int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers; + int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers; if (numAvailableMemorySegment == 0) { // in this case, we need to redistribute buffers so that every pool gets its minimum for (LocalBufferPool bufferPool : resizableBufferPools) { - bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments()); + bufferPool.setNumBuffers(bufferPool.getMinNumberOfMemorySegments()); } return; } - /* - * With buffer pools being potentially limited, let's distribute the available memory - * segments based on the capacity of each buffer pool, i.e. the maximum number of segments - * an unlimited buffer pool can take is numAvailableMemorySegment, for limited buffer pools - * it may be less. Based on this and the sum of all these values (totalCapacity), we build - * a ratio that we use to distribute the buffers. - */ - - long totalCapacity = 0; // long to avoid int overflow Review Comment: The total redistributing process is a little complex and hard to understand. We'd better add detailed descriptions here to show that process. In addition, we should add more tests to cover the corner cases. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java: ########## @@ -79,7 +79,9 @@ public TieredStorageProducerClient( this.currentSubpartitionTierAgent = new TierProducerAgent[numSubpartitions]; Arrays.fill(currentSubpartitionSegmentId, -1); + } Review Comment: is `setUp` useless? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org