WencongLiu commented on code in PR #22975: URL: https://github.com/apache/flink/pull/22975#discussion_r1270307929
########## flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java: ########## @@ -381,6 +393,38 @@ public class NettyShuffleEnvironmentOptions { + "tasks have occupied all the buffers and the downstream tasks are waiting for the exclusive buffers. The timeout breaks" + "the tie by failing the request of exclusive buffers and ask users to increase the number of total buffers."); + /** The option to enable the new mode of hybrid shuffle. */ + @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) + @Experimental + public static final ConfigOption<Boolean> NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE = + ConfigOptions.key(HYBRID_SHUFFLE_NEW_MODE_OPTION_NAME) + .booleanType() + .defaultValue(true) + .withDescription( + "The option is used to enable the new mode of hybrid shuffle, which has resolved existing issues in the legacy mode. First, the new mode " + + "uses less required network memory. Second, the new mode can store shuffle data in remote storage when the disk space is not " + + "enough, which could avoid insufficient disk space errors and is only supported when " + + HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH_OPTION_NAME + + " is configured. The new mode is currently in an experimental phase. The option can be set to false to fallback to the legacy " + + "mode if the new mode is unstable. Once the new mode reaches a stable state, the legacy mode and the option will be removed."); Review Comment: Fixed. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java: ########## @@ -18,58 +18,435 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common; -import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferAccumulator; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulator; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.SortBufferAccumulator; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk.DiskIOScheduler; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk.DiskTierFactory; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.memory.MemoryTierFactory; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote.RemoteTierFactory; -import javax.annotation.Nullable; - -import java.util.Arrays; +import java.time.Duration; +import java.util.ArrayList; import java.util.List; /** Configurations for the Tiered Storage. */ public class TieredStorageConfiguration { - // TODO, after implementing the tier factory, add appreciate implementations to the array. - private static final TierFactory[] DEFAULT_MEMORY_DISK_TIER_FACTORIES = new TierFactory[0]; + private static final String DEFAULT_REMOTE_STORAGE_BASE_PATH = null; + + private static final int DEFAULT_TIERED_STORAGE_BUFFER_SIZE = 32 * 1024; + + private static final int DEFAULT_MEMORY_TIER_EXCLUSIVE_BUFFERS = 100; + + private static final int DEFAULT_DISK_TIER_EXCLUSIVE_BUFFERS = 1; + + private static final int DEFAULT_REMOTE_TIER_EXCLUSIVE_BUFFERS = 1; + + private static final int DEFAULT_NUM_BUFFERS_USE_SORT_ACCUMULATOR_THRESHOLD = 512; + + private static final int DEFAULT_MEMORY_TIER_NUM_BYTES_PER_SEGMENT = 320 * 1024; + + private static final int DEFAULT_DISK_TIER_NUM_BYTES_PER_SEGMENT = 8 * 1024 * 1024; + + private static final int DEFAULT_REMOTE_TIER_NUM_BYTES_PER_SEGMENT = 8 * 1024 * 1024; + + private static final float DEFAULT_NUM_BUFFERS_TRIGGER_FLUSH_RATIO = 0.5f; + + private static final int DEFAULT_DISK_TIER_MAX_BUFFERS_READ_AHEAD = 5; + + private static final Duration DEFAULT_DISK_TIER_BUFFER_REQUEST_TIMEOUT = Duration.ofMinutes(5); + + private static final float DEFAULT_MIN_RESERVE_SPACE_FRACTION = 0.05f; + + private static final int DEFAULT_REGION_GROUP_SIZE_IN_BYTES = 1024; + + private static final long DEFAULT_MAX_REGION_NUM_RETAINED_IN_MEMORY = 1024 * 1024L; + + private final String remoteStorageBasePath; - /** If the remote storage tier is not used, this field may be null. */ - @Nullable private final String remoteStorageBasePath; + private final int tieredStorageBufferSize; - public TieredStorageConfiguration(@Nullable String remoteStorageBasePath) { + private final int memoryTierExclusiveBuffers; + + private final int diskTierExclusiveBuffers; + + private final int remoteTierExclusiveBuffers; + + private final int accumulatorExclusiveBuffers; + + private final int memoryTierNumBytesPerSegment; + + private final int diskTierNumBytesPerSegment; + + private final int remoteTierNumBytesPerSegment; + + private final float numBuffersTriggerFlushRatio; + + private final int diskIOSchedulerMaxBuffersReadAhead; + + private final Duration diskIOSchedulerRequestTimeout; + + private final float minReserveSpaceFraction; Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org