xintongsong commented on code in PR #23851:
URL: https://github.com/apache/flink/pull/23851#discussion_r1461707655


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/CreditBasedInputBuffersUsageGauge.java:
##########
@@ -28,27 +31,35 @@
  */
 public class CreditBasedInputBuffersUsageGauge extends 
AbstractBuffersUsageGauge {
 
-    private final FloatingBuffersUsageGauge floatingBuffersUsageGauge;
-    private final ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge;
-
-    public CreditBasedInputBuffersUsageGauge(
-            FloatingBuffersUsageGauge floatingBuffersUsageGauge,
-            ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge,
-            SingleInputGate[] inputGates) {
+    public CreditBasedInputBuffersUsageGauge(SingleInputGate[] inputGates) {

Review Comment:
   It seems all these `AbstractBuffersUsageGauge` / `calculateUsedBuffers` / 
`calculateTotalBuffers` things are for calculating 
`AbstractBuffersUsageGauge#getValue`.
   1. Do we still need `FloatingBuffersUsageGauge` / 
`ExclusiveBuffersUsageGauge` at all?
   2. Can we simply calculate the usage from used / total buffer of the pool?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -286,13 +312,13 @@ public int getMaxNumberOfMemorySegments() {
      *
      * @return the same value as {@link #getMaxNumberOfMemorySegments()} for 
bounded pools. For
      *     unbounded pools it returns an approximation based upon {@link
-     *     #getNumberOfRequiredMemorySegments()}
+     *     #getExpectedNumberOfMemorySegments()}
      */
     public int getEstimatedNumberOfRequestedMemorySegments() {
         if (maxNumberOfMemorySegments < NetworkBufferPool.UNBOUNDED_POOL_SIZE) 
{
             return maxNumberOfMemorySegments;
         } else {
-            return getNumberOfRequiredMemorySegments() * 2;
+            return getMinNumberOfMemorySegments() * 2;

Review Comment:
   This min-times-2 estimation don't really make sense. I'd suggest to simply 
use the expected number-of-segments.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java:
##########
@@ -600,73 +607,97 @@ private void redistributeBuffers() {
         }
 
         // All buffers, which are not among the required ones
-        final int numAvailableMemorySegment = totalNumberOfMemorySegments - 
numTotalRequiredBuffers;
+        int numAvailableMemorySegment = totalNumberOfMemorySegments - 
numTotalRequiredBuffers;
 
         if (numAvailableMemorySegment == 0) {
             // in this case, we need to redistribute buffers so that every 
pool gets its minimum
             for (LocalBufferPool bufferPool : resizableBufferPools) {
-                
bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments());
+                
bufferPool.setNumBuffers(bufferPool.getMinNumberOfMemorySegments());
             }
             return;
         }
 
-        /*
-         * With buffer pools being potentially limited, let's distribute the 
available memory
-         * segments based on the capacity of each buffer pool, i.e. the 
maximum number of segments
-         * an unlimited buffer pool can take is numAvailableMemorySegment, for 
limited buffer pools
-         * it may be less. Based on this and the sum of all these values 
(totalCapacity), we build
-         * a ratio that we use to distribute the buffers.
-         */
+        Map<LocalBufferPool, Integer> cachedPoolSize =
+                resizableBufferPools.stream()
+                        .collect(
+                                Collectors.toMap(
+                                        Function.identity(),
+                                        
LocalBufferPool::getMinNumberOfMemorySegments));
+
+        while (true) {
+            int remaining = redistributeBuffers(numAvailableMemorySegment, 
cachedPoolSize);
+
+            // Stop the loop iteration when there is no remaining segments or 
all local buffer pools
+            // have reached the max number.
+            if (remaining == 0 || remaining == numAvailableMemorySegment) {
+                for (LocalBufferPool bufferPool : resizableBufferPools) {
+                    bufferPool.setNumBuffers(
+                            cachedPoolSize.getOrDefault(
+                                    bufferPool, 
bufferPool.getMinNumberOfMemorySegments()));
+                }
+                break;
+            }
+            numAvailableMemorySegment = remaining;
+        }
+    }
 
-        long totalCapacity = 0; // long to avoid int overflow
+    /**
+     * @param numBuffersToRedistribute the buffers to be redistributed.
+     * @param cachedPoolSize the map to cache the intermediate result.
+     * @return the remaining buffers that can continue to be redistributed.
+     */
+    private int redistributeBuffers(

Review Comment:
   I don't really get the calculation logics here. Shouldn't we use the 
expected-number-of-buffers as the weight?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/SortBufferAccumulator.java:
##########
@@ -94,29 +99,58 @@ public class SortBufferAccumulator implements 
BufferAccumulator {
     @Nullable
     private TriConsumer<TieredStorageSubpartitionId, Buffer, Integer> 
accumulatedBufferFlusher;
 
+    /**
+     * An executor to periodically check the size of buffer pool. If the size 
is changed, the
+     * accumulated buffers should be flushed to release the buffers.
+     */
+    private final ScheduledExecutorService periodicalAccumulatorFlusher =
+            Executors.newSingleThreadScheduledExecutor(
+                    new 
ExecutorThreadFactory("hybrid-shuffle-periodical-accumulator-flusher"));
+
+    private final long poolSizeCheckInterval;
+
+    private AtomicInteger poolSize;
+
     /** Whether the current {@link DataBuffer} is a broadcast sort buffer. */
     private boolean isBroadcastDataBuffer;
 
     public SortBufferAccumulator(
             int numSubpartitions,
-            int numBuffers,
+            int numExpectedBuffers,
             int bufferSizeBytes,
+            long poolSizeCheckInterval,
             TieredStorageMemoryManager memoryManager,
             boolean isPartialRecordAllowed) {
         this.numSubpartitions = numSubpartitions;
         this.bufferSizeBytes = bufferSizeBytes;
-        this.numBuffers = numBuffers;
+        this.numExpectedBuffers = numExpectedBuffers;
+        this.poolSizeCheckInterval = poolSizeCheckInterval;
         this.memoryManager = memoryManager;
         this.isPartialRecordAllowed = isPartialRecordAllowed;
     }
 
     @Override
     public void setup(TriConsumer<TieredStorageSubpartitionId, Buffer, 
Integer> bufferFlusher) {
         this.accumulatedBufferFlusher = bufferFlusher;
+        this.poolSize = new AtomicInteger(memoryManager.getBufferPoolSize());
+
+        if (poolSizeCheckInterval > 0) {
+            periodicalAccumulatorFlusher.scheduleAtFixedRate(
+                    () -> {
+                        int newSize = this.memoryManager.getBufferPoolSize();
+                        int oldSize = poolSize.getAndSet(newSize);
+                        if (oldSize != newSize) {

Review Comment:
   Not sure about always flush if `oldSize != newSize`. What if the pool size 
increases, or decreases but still have lots of spaces?



##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -435,6 +438,17 @@ public class NettyShuffleEnvironmentOptions {
                                     + HYBRID_SHUFFLE_NEW_MODE_OPTION_NAME
                                     + " is true, the remote storage will be 
disabled.");
 
+    /** The option to enable the memory-safe mode of hybrid shuffle. */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    @Experimental
+    public static final ConfigOption<Boolean> 
NETWORK_HYBRID_SHUFFLE_ENABLE_MEMORY_SAFE_MODE =
+            key(HYBRID_SHUFFLE_MEMORY_SAFE_MODE_OPTION_NAME)
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "The option is used to enable the memory-safe mode 
of hybrid shuffle, which"
+                                    + "makes the shuffle can work with a 
little memory regardless of the job topology and how many task are running on 
the task manager.");
+

Review Comment:
   Should warn users about the potential performance impact.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -180,38 +191,48 @@ public class LocalBufferPool implements BufferPool {
      * with a minimal and maximal number of network buffers being available.
      *
      * @param networkBufferPool global network buffer pool to get buffers from
-     * @param numberOfRequiredMemorySegments minimum number of network buffers
+     * @param expectedNumberOfMemorySegments expected number of network buffers
+     * @param minNumberOfMemorySegments minimum number of network buffers
      * @param maxNumberOfMemorySegments maximum number of network buffers to 
allocate
      * @param numberOfSubpartitions number of subpartitions
      * @param maxBuffersPerChannel maximum number of buffers to use for each 
channel
      * @param maxOverdraftBuffersPerGate maximum number of overdraft buffers 
to use for each gate
      */
     LocalBufferPool(
             NetworkBufferPool networkBufferPool,
-            int numberOfRequiredMemorySegments,
+            int expectedNumberOfMemorySegments,
+            int minNumberOfMemorySegments,
             int maxNumberOfMemorySegments,
             int numberOfSubpartitions,
             int maxBuffersPerChannel,
             int maxOverdraftBuffersPerGate) {
         checkArgument(
-                numberOfRequiredMemorySegments > 0,
-                "Required number of memory segments (%s) should be larger than 
0.",
-                numberOfRequiredMemorySegments);
+                minNumberOfMemorySegments > 0,
+                "Minimum number of memory segments (%s) should be larger than 
0.",
+                minNumberOfMemorySegments);
 
         checkArgument(
-                maxNumberOfMemorySegments >= numberOfRequiredMemorySegments,
-                "Maximum number of memory segments (%s) should not be smaller 
than minimum (%s).",
+                expectedNumberOfMemorySegments >= minNumberOfMemorySegments,
+                "Minimum number of memory segments (%s) should not be larger 
than expected (%s).",
+                minNumberOfMemorySegments,
+                expectedNumberOfMemorySegments);
+
+        checkArgument(
+                maxNumberOfMemorySegments >= expectedNumberOfMemorySegments,
+                "Maximum number of memory segments (%s) should not be smaller 
than expected (%s).",
                 maxNumberOfMemorySegments,
-                numberOfRequiredMemorySegments);
+                expectedNumberOfMemorySegments);
 
         LOG.debug(
-                "Using a local buffer pool with {}-{} buffers",
-                numberOfRequiredMemorySegments,
+                "Using a local buffer pool with {}-{}-{} buffers",

Review Comment:
   The meaning of each number is unclear.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java:
##########
@@ -79,7 +79,9 @@ public TieredStorageProducerClient(
         this.currentSubpartitionTierAgent = new 
TierProducerAgent[numSubpartitions];
 
         Arrays.fill(currentSubpartitionSegmentId, -1);
+    }
 
+    public void setUp() {

Review Comment:
   Why do we need this separate `setup`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java:
##########
@@ -153,6 +153,9 @@ public SingleInputGate create(
                 igdd.getConsumedPartitionType().isHybridResultPartition()
                         && tieredStorageConfiguration != null;
 
+        boolean enableTieredStorage = tieredStorageConfiguration != null;
+        boolean enableMemorySafeMode =

Review Comment:
   Never used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to