reswqa commented on code in PR #22975:
URL: https://github.com/apache/flink/pull/22975#discussion_r1270166549


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java:
##########
@@ -122,6 +155,9 @@ public ResultPartitionFactory(
         this.hybridShuffleSpilledIndexRegionGroupSize = 
hybridShuffleSpilledIndexRegionGroupSize;
         this.hybridShuffleNumRetainedInMemoryRegionsMax =
                 hybridShuffleNumRetainedInMemoryRegionsMax;
+        this.tieredStorageConfiguration = tieredStorageConfiguration;
+        this.tieredStorageNettyService = tieredStorageNettyService;
+        this.tieredStorageResourceRegistry = tieredStorageResourceRegistry;

Review Comment:
   I wonder can we combine all stuff related to `tieredStorage` to a single 
container class and pass a optional<That Class> to this 
`ResultPartitionFactory`.



##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -28,6 +28,12 @@
 @PublicEvolving
 public class NettyShuffleEnvironmentOptions {
 
+    private static final String HYBRID_SHUFFLE_NEW_MODE_OPTION_NAME =
+            "taskmanager.network.hybrid-shuffle.enable-new-mode";
+
+    private static final String 
HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH_OPTION_NAME =
+            "taskmanager.network.hybrid-shuffle.remote.path";

Review Comment:
   It seems strange that the other options do not define key name separately, 
so I suggest maintaining consistency.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java:
##########
@@ -18,58 +18,435 @@
 
 package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
 
-import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferAccumulator;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulator;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.SortBufferAccumulator;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk.DiskIOScheduler;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk.DiskTierFactory;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.memory.MemoryTierFactory;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote.RemoteTierFactory;
 
-import javax.annotation.Nullable;
-
-import java.util.Arrays;
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.List;
 
 /** Configurations for the Tiered Storage. */
 public class TieredStorageConfiguration {
 
-    // TODO, after implementing the tier factory, add appreciate 
implementations to the array.
-    private static final TierFactory[] DEFAULT_MEMORY_DISK_TIER_FACTORIES = 
new TierFactory[0];
+    private static final String DEFAULT_REMOTE_STORAGE_BASE_PATH = null;
+
+    private static final int DEFAULT_TIERED_STORAGE_BUFFER_SIZE = 32 * 1024;
+
+    private static final int DEFAULT_MEMORY_TIER_EXCLUSIVE_BUFFERS = 100;
+
+    private static final int DEFAULT_DISK_TIER_EXCLUSIVE_BUFFERS = 1;
+
+    private static final int DEFAULT_REMOTE_TIER_EXCLUSIVE_BUFFERS = 1;
+
+    private static final int 
DEFAULT_NUM_BUFFERS_USE_SORT_ACCUMULATOR_THRESHOLD = 512;
+
+    private static final int DEFAULT_MEMORY_TIER_NUM_BYTES_PER_SEGMENT = 320 * 
1024;
+
+    private static final int DEFAULT_DISK_TIER_NUM_BYTES_PER_SEGMENT = 8 * 
1024 * 1024;
+
+    private static final int DEFAULT_REMOTE_TIER_NUM_BYTES_PER_SEGMENT = 8 * 
1024 * 1024;
+
+    private static final float DEFAULT_NUM_BUFFERS_TRIGGER_FLUSH_RATIO = 0.5f;
+
+    private static final int DEFAULT_DISK_TIER_MAX_BUFFERS_READ_AHEAD = 5;
+
+    private static final Duration DEFAULT_DISK_TIER_BUFFER_REQUEST_TIMEOUT = 
Duration.ofMinutes(5);
+
+    private static final float DEFAULT_MIN_RESERVE_SPACE_FRACTION = 0.05f;
+
+    private static final int DEFAULT_REGION_GROUP_SIZE_IN_BYTES = 1024;
+
+    private static final long DEFAULT_MAX_REGION_NUM_RETAINED_IN_MEMORY = 1024 
* 1024L;
+
+    private final String remoteStorageBasePath;
 
-    /** If the remote storage tier is not used, this field may be null. */
-    @Nullable private final String remoteStorageBasePath;
+    private final int tieredStorageBufferSize;
 
-    public TieredStorageConfiguration(@Nullable String remoteStorageBasePath) {
+    private final int memoryTierExclusiveBuffers;
+
+    private final int diskTierExclusiveBuffers;
+
+    private final int remoteTierExclusiveBuffers;
+
+    private final int accumulatorExclusiveBuffers;
+
+    private final int memoryTierNumBytesPerSegment;
+
+    private final int diskTierNumBytesPerSegment;
+
+    private final int remoteTierNumBytesPerSegment;
+
+    private final float numBuffersTriggerFlushRatio;
+
+    private final int diskIOSchedulerMaxBuffersReadAhead;
+
+    private final Duration diskIOSchedulerRequestTimeout;
+
+    private final float minReserveSpaceFraction;

Review Comment:
   ```suggestion
       private final float minReserveDiskSpaceFraction;
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/NettyShuffleUtilsTest.java:
##########
@@ -99,6 +100,8 @@ public void testComputeRequiredNetworkBuffers() throws 
Exception {
                         maxRequiredBuffersPerGate,
                         sortShuffleMinParallelism,
                         numSortShuffleMinBuffers,
+                        false,
+                        tieredStoreExclusiveBuffers,

Review Comment:
   The last two commits need to be tested, especially the `GateBuffersSpecTest`.



##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -381,6 +393,38 @@ public class NettyShuffleEnvironmentOptions {
                                     + "tasks have occupied all the buffers and 
the downstream tasks are waiting for the exclusive buffers. The timeout breaks"
                                     + "the tie by failing the request of 
exclusive buffers and ask users to increase the number of total buffers.");
 
+    /** The option to enable the new mode of hybrid shuffle. */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    @Experimental
+    public static final ConfigOption<Boolean> 
NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE =
+            ConfigOptions.key(HYBRID_SHUFFLE_NEW_MODE_OPTION_NAME)
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "The option is used to enable the new mode of 
hybrid shuffle, which has resolved existing issues in the legacy mode. First, 
the new mode "
+                                    + "uses less required network memory. 
Second, the new mode can store shuffle data in remote storage when the disk 
space is not "
+                                    + "enough, which could avoid insufficient 
disk space errors and is only supported when "
+                                    + 
HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH_OPTION_NAME
+                                    + " is configured. The new mode is 
currently in an experimental phase. The option can be set to false to fallback 
to the legacy "
+                                    + "mode if the new mode is unstable. Once 
the new mode reaches a stable state, the legacy mode and the option will be 
removed.");

Review Comment:
   ```suggestion
                                       Once the new mode reaches a stable 
state, the legacy mode as well as this option will be removed.");
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java:
##########
@@ -67,6 +75,17 @@ public NettyShuffleMaster(Configuration conf) {
                 
conf.getInteger(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS);
         networkBufferSize = ConfigurationParserUtils.getPageSize(conf);
 
+        if (conf.getBoolean(NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE)) {
+            tieredStorageConfiguration =
+                    TieredStorageConfiguration.builder(
+                                    
conf.getString(NETWORK_HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH))
+                            .build();
+            tieredInternalShuffleMaster = new 
TieredInternalShuffleMaster(conf);
+        } else {
+            tieredStorageConfiguration = null;
+            tieredInternalShuffleMaster = null;
+        }

Review Comment:
   This is redundant.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java:
##########
@@ -18,58 +18,435 @@
 
 package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
 
-import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferAccumulator;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulator;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.SortBufferAccumulator;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk.DiskIOScheduler;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk.DiskTierFactory;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.memory.MemoryTierFactory;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote.RemoteTierFactory;
 
-import javax.annotation.Nullable;
-
-import java.util.Arrays;
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.List;
 
 /** Configurations for the Tiered Storage. */
 public class TieredStorageConfiguration {

Review Comment:
   The second commit `Enable the TieredStorageConfiguration` is a bit unclear, 
I suggest that rename this to `Enable TieredStorageResultPartition` and squash 
it with the first commit.



##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -381,6 +393,38 @@ public class NettyShuffleEnvironmentOptions {
                                     + "tasks have occupied all the buffers and 
the downstream tasks are waiting for the exclusive buffers. The timeout breaks"
                                     + "the tie by failing the request of 
exclusive buffers and ask users to increase the number of total buffers.");
 
+    /** The option to enable the new mode of hybrid shuffle. */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    @Experimental
+    public static final ConfigOption<Boolean> 
NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE =
+            ConfigOptions.key(HYBRID_SHUFFLE_NEW_MODE_OPTION_NAME)
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "The option is used to enable the new mode of 
hybrid shuffle, which has resolved existing issues in the legacy mode. First, 
the new mode "
+                                    + "uses less required network memory. 
Second, the new mode can store shuffle data in remote storage when the disk 
space is not "
+                                    + "enough, which could avoid insufficient 
disk space errors and is only supported when "
+                                    + 
HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH_OPTION_NAME
+                                    + " is configured. The new mode is 
currently in an experimental phase. The option can be set to false to fallback 
to the legacy "

Review Comment:
   ```suggestion
                                       + " is configured. The new mode is 
currently in an experimental phase. It can be set to false to fallback to the 
legacy mode if something unexpected."
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java:
##########
@@ -164,7 +164,10 @@ private static ResultPartition createResultPartition(
                         false,
                         0,
                         256,
-                        Long.MAX_VALUE);
+                        Long.MAX_VALUE,

Review Comment:
   We does need some test for tieredStorage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to