xintongsong commented on code in PR #22975: URL: https://github.com/apache/flink/pull/22975#discussion_r1259626675
########## flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java: ########## @@ -49,7 +55,9 @@ public class NettyShuffleMaster implements ShuffleMaster<NettyShuffleDescriptor> private final int networkBufferSize; - // TODO, WIP: create tiered internal shuffle master after enabling the tiered storage. + @Nullable private TieredInternalShuffleMaster tieredInternalShuffleMaster; + + @Nullable private TieredStorageConfiguration tieredStorageConfiguration; Review Comment: Should be final. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java: ########## @@ -40,7 +41,9 @@ public class TieredInternalShuffleMaster { public TieredInternalShuffleMaster(Configuration conf) { TieredStorageConfiguration tieredStorageConfiguration = - TieredStorageConfiguration.fromConfiguration(conf); + TieredStorageConfiguration.builder( + conf.getString(NETWORK_HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH)) + .build(); Review Comment: This change seems belonging to the first commit. Otherwise, there will be a compiling error. ########## flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java: ########## @@ -379,6 +380,28 @@ public class NettyShuffleEnvironmentOptions { + "tasks have occupied all the buffers and the downstream tasks are waiting for the exclusive buffers. The timeout breaks" + "the tie by failing the request of exclusive buffers and ask users to increase the number of total buffers."); + @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) + @Experimental + @Internal + public static final ConfigOption<Boolean> NETWORK_HYBRID_SHUFFLE_ENABLE_TIERED_STORAGE = + ConfigOptions.key("taskmanager.network.hybrid-shuffle.enable-tiered-storage") + .booleanType() + .defaultValue(false) + .withDescription( + "The option is used to enable tiered storage architecture for hybrid shuffle mode."); + + @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) + @Experimental + @Internal + public static final ConfigOption<String> NETWORK_HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH = + key("taskmanager.network.hybrid-shuffle.remote.path") + .stringType() + .noDefaultValue() + .withDescription( + "The base home path of remote storage for remote tier. If the option is configured, " + + "Hybrid Shuffle will use the remote storage path as a supplement to the" + + "local disks. If not configured, the remote storage will not be used."); Review Comment: 1. This is not internal 2. JavaDoc is missing 3. Shall we expose the concept `remote tier` to users at all? What are the minimum necessary things that user needs to understand? ########## flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java: ########## @@ -379,6 +380,28 @@ public class NettyShuffleEnvironmentOptions { + "tasks have occupied all the buffers and the downstream tasks are waiting for the exclusive buffers. The timeout breaks" + "the tie by failing the request of exclusive buffers and ask users to increase the number of total buffers."); + @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) + @Experimental + @Internal + public static final ConfigOption<Boolean> NETWORK_HYBRID_SHUFFLE_ENABLE_TIERED_STORAGE = + ConfigOptions.key("taskmanager.network.hybrid-shuffle.enable-tiered-storage") + .booleanType() + .defaultValue(false) + .withDescription( + "The option is used to enable tiered storage architecture for hybrid shuffle mode."); Review Comment: 1. This is not internal. 2. JavaDoc is missing. 3. What is "tiered storage architecture" from the user's perspective? How is it different from the original architecture? What does the user needs to know about this? 4. Why is the default value `false`? How can a user decide which mode to use? Would there be anyone enable it at all? ########## flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java: ########## @@ -379,6 +380,28 @@ public class NettyShuffleEnvironmentOptions { + "tasks have occupied all the buffers and the downstream tasks are waiting for the exclusive buffers. The timeout breaks" + "the tie by failing the request of exclusive buffers and ask users to increase the number of total buffers."); + @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) + @Experimental + @Internal + public static final ConfigOption<Boolean> NETWORK_HYBRID_SHUFFLE_ENABLE_TIERED_STORAGE = + ConfigOptions.key("taskmanager.network.hybrid-shuffle.enable-tiered-storage") + .booleanType() + .defaultValue(false) + .withDescription( + "The option is used to enable tiered storage architecture for hybrid shuffle mode."); + + @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) + @Experimental + @Internal + public static final ConfigOption<String> NETWORK_HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH = + key("taskmanager.network.hybrid-shuffle.remote.path") + .stringType() + .noDefaultValue() + .withDescription( + "The base home path of remote storage for remote tier. If the option is configured, " + + "Hybrid Shuffle will use the remote storage path as a supplement to the" + + "local disks. If not configured, the remote storage will not be used."); + Review Comment: Is there any config option that only the old mode of hybrid shuffle uses but not the new one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org