xintongsong commented on code in PR #22975: URL: https://github.com/apache/flink/pull/22975#discussion_r1263432147
########## flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java: ########## @@ -379,6 +391,40 @@ public class NettyShuffleEnvironmentOptions { + "tasks have occupied all the buffers and the downstream tasks are waiting for the exclusive buffers. The timeout breaks" + "the tie by failing the request of exclusive buffers and ask users to increase the number of total buffers."); + /** The option to enable the new mode of hybrid shuffle. */ + @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) + @Experimental + public static final ConfigOption<Boolean> NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE = + ConfigOptions.key(HYBRID_SHUFFLE_NEW_MODE_OPTION_NAME) + .booleanType() + .defaultValue(true) + .withDescription( + "The option is used to enable the new mode of hybrid shuffle, which has resolved existing issues in the legacy mode. First, the new mode " + + "uses less required network memory. For large parallelism jobs, less network memory is required. Second, the new mode " + + "can store shuffle data in remote storage when the disk space is not enough, which could avoid insufficient disk space " + + "errors and is only supported when " + + HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH_OPTION_NAME + + " is configured. " + + "The option is set true by default, but the new mode can fallback to legacy mode by setting this option false. " + + "In the future, the new mode will become the default implementation of hybrid shuffle and this option will be removed."); Review Comment: 1. "First, the new mode uses less required network memory. For large parallelism jobs, less network memory is required." This sounds duplicated. 2. "but the new mode can fallback to legacy mode by setting this option false." Why would user fallback to the legacy mode? 3. "In the future, the new mode will become the default implementation of hybrid shuffle and this option will be removed." The new mode is already the default implementation. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java: ########## @@ -18,58 +18,408 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common; -import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferAccumulator; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulator; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.SortBufferAccumulator; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk.DiskIOScheduler; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk.DiskTierFactory; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.memory.MemoryTierFactory; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote.RemoteTierFactory; -import javax.annotation.Nullable; - -import java.util.Arrays; +import java.time.Duration; +import java.util.ArrayList; import java.util.List; /** Configurations for the Tiered Storage. */ public class TieredStorageConfiguration { - // TODO, after implementing the tier factory, add appreciate implementations to the array. - private static final TierFactory[] DEFAULT_MEMORY_DISK_TIER_FACTORIES = new TierFactory[0]; + private static final String DEFAULT_REMOTE_STORAGE_BASE_PATH = null; + + private static final int DEFAULT_TIERED_STORAGE_BUFFER_SIZE = 32 * 1024; + + private static final int DEFAULT_MEMORY_TIER_EXCLUSIVE_BUFFERS = 100; + + private static final int DEFAULT_DISK_TIER_EXCLUSIVE_BUFFERS = 1; + + private static final int DEFAULT_REMOTE_TIER_EXCLUSIVE_BUFFERS = 1; + + private static final int DEFAULT_NUM_BUFFERS_USE_SORT_ACCUMULATOR_THRESHOLD = 512; + + private static final int DEFAULT_MEMORY_TIER_NUM_BYTES_PER_SEGMENT = 320 * 1024; + + private static final int DEFAULT_DISK_TIER_NUM_BYTES_PER_SEGMENT = 8 * 1024 * 1024; + + private static final int DEFAULT_REMOTE_TIER_NUM_BYTES_PER_SEGMENT = 8 * 1024 * 1024; + + private static final float DEFAULT_NUM_BUFFERS_TRIGGER_FLUSH_RATIO = 0.5f; + + private static final int DEFAULT_DISK_TIER_MAX_BUFFERS_READ_AHEAD = 5; + + private static final Duration DEFAULT_DISK_TIER_BUFFER_REQUEST_TIMEOUT = Duration.ofMinutes(5); + + private static final float DEFAULT_MIN_RESERVE_SPACE_FRACTION = 0.05f; + + private final String remoteStorageBasePath; - /** If the remote storage tier is not used, this field may be null. */ - @Nullable private final String remoteStorageBasePath; + private final int tieredStorageBufferSize; - public TieredStorageConfiguration(@Nullable String remoteStorageBasePath) { + private final int memoryTierExclusiveBuffers; + + private final int diskTierExclusiveBuffers; + + private final int remoteTierExclusiveBuffers; + + private final int accumulatorExclusiveBuffers; + + private final int memoryTierNumBytesPerSegment; + + private final int diskTierNumBytesPerSegment; + + private final int remoteTierNumBytesPerSegment; + + private final float numBuffersTriggerFlushRatio; + + private final int diskIOSchedulerMaxBuffersReadAhead; + + private final Duration diskIOSchedulerRequestTimeout; + + private final float minReserveSpaceFraction; + + private final List<TierFactory> tierFactories; + + public TieredStorageConfiguration( + String remoteStorageBasePath, + int tieredStorageBufferSize, + int memoryTierExclusiveBuffers, + int diskTierExclusiveBuffers, + int remoteTierExclusiveBuffers, + int accumulatorExclusiveBuffers, + int memoryTierNumBytesPerSegment, + int diskTierNumBytesPerSegment, + int remoteTierNumBytesPerSegment, + float numBuffersTriggerFlushRatio, + int diskIOSchedulerMaxBuffersReadAhead, + Duration diskIOSchedulerRequestTimeout, + float minReserveSpaceFraction, + List<TierFactory> tierFactories) { this.remoteStorageBasePath = remoteStorageBasePath; + this.tieredStorageBufferSize = tieredStorageBufferSize; + this.memoryTierExclusiveBuffers = memoryTierExclusiveBuffers; + this.diskTierExclusiveBuffers = diskTierExclusiveBuffers; + this.remoteTierExclusiveBuffers = remoteTierExclusiveBuffers; + this.accumulatorExclusiveBuffers = accumulatorExclusiveBuffers; + this.memoryTierNumBytesPerSegment = memoryTierNumBytesPerSegment; + this.diskTierNumBytesPerSegment = diskTierNumBytesPerSegment; + this.remoteTierNumBytesPerSegment = remoteTierNumBytesPerSegment; + this.numBuffersTriggerFlushRatio = numBuffersTriggerFlushRatio; + this.diskIOSchedulerMaxBuffersReadAhead = diskIOSchedulerMaxBuffersReadAhead; + this.diskIOSchedulerRequestTimeout = diskIOSchedulerRequestTimeout; + this.minReserveSpaceFraction = minReserveSpaceFraction; + this.tierFactories = tierFactories; } - public List<TierFactory> getTierFactories() { - return Arrays.asList(DEFAULT_MEMORY_DISK_TIER_FACTORIES); + public static Builder builder(String remoteStorageBasePath) { + return new TieredStorageConfiguration.Builder() + .setRemoteStorageBasePath(remoteStorageBasePath); } - @Nullable + public static Builder builder(int tieredStorageBufferSize, String remoteStorageBasePath) { + return new TieredStorageConfiguration.Builder() + .setTieredStorageBufferSize(tieredStorageBufferSize) + .setRemoteStorageBasePath(remoteStorageBasePath); + } + + /** + * Get the base path on remote storage. + * + * @return string if the remote storage path is configured otherwise null. + */ public String getRemoteStorageBasePath() { return remoteStorageBasePath; } - public static TieredStorageConfiguration.Builder builder() { - return new TieredStorageConfiguration.Builder(); + /** + * Get the buffer size in tiered storage. + * + * @return the buffer size. + */ + public int getTieredStorageBufferSize() { + return tieredStorageBufferSize; + } + + /** + * Get exclusive buffer number of memory tier. + * + * @return the buffer number. + */ + public int getMemoryTierExclusiveBuffers() { + return memoryTierExclusiveBuffers; + } + + /** + * Get exclusive buffer number of disk tier. + * + * @return the buffer number. + */ + public int getDiskTierExclusiveBuffers() { + return diskTierExclusiveBuffers; + } + + /** + * Get exclusive buffer number of remote tier. + * + * @return the buffer number. + */ + public int getRemoteTierExclusiveBuffers() { + return remoteTierExclusiveBuffers; } - /** Builder for {@link TieredStorageConfiguration}. */ + /** + * Get exclusive buffer number of accumulator. + * + * <p>The buffer number is used to compare with the subpartition number to determine the type of + * {@link BufferAccumulator}. + * + * <p>If the exclusive buffer number is larger than (subpartitionNum + 1), the accumulator will + * use {@link HashBufferAccumulator}. If the exclusive buffer number is equal to or smaller than + * (subpartitionNum + 1), the accumulator will use {@link SortBufferAccumulator} + * + * @return the buffer number. + */ + public int getAccumulatorExclusiveBuffers() { + return accumulatorExclusiveBuffers; + } + + /** + * Get the segment size of memory tier. + * + * @return segment size. + */ + public int getMemoryTierNumBytesPerSegment() { + return memoryTierNumBytesPerSegment; + } + + /** + * Get the segment size of disk tier. + * + * @return segment size. + */ + public int getDiskTierNumBytesPerSegment() { + return diskTierNumBytesPerSegment; + } + + /** + * Get the segment size of remote tier. + * + * @return segment size. + */ + public int getRemoteTierNumBytesPerSegment() { + return remoteTierNumBytesPerSegment; + } + + /** + * When the number of buffers that have been requested exceeds this threshold, trigger the + * flushing operation in each {@link TierProducerAgent}. + * + * @return flush ratio. + */ + public float getNumBuffersTriggerFlushRatio() { + return numBuffersTriggerFlushRatio; + } + + /** + * The number of buffers to read ahead at most for each subpartition in {@link DiskIOScheduler}, + * which can be used to prevent other consumers from starving. + * + * @return buffer number. + */ + public int getDiskIOSchedulerMaxBuffersReadAhead() { + return diskIOSchedulerMaxBuffersReadAhead; + } + + /** + * Maximum time to wait when requesting read buffers from the buffer pool before throwing an + * exception in {@link DiskIOScheduler}. + * + * @return timeout duration. + */ + public Duration getDiskIOSchedulerBufferRequestTimeout() { + return diskIOSchedulerRequestTimeout; + } + + /** + * Minimum reserved disk space fraction in disk tier. + * + * @return the fraction. + */ + public float getMinReserveSpaceFraction() { + return minReserveSpaceFraction; + } + + /** + * Get the total exclusive buffer number. + * + * @return the total exclusive buffer number. + */ + public int getTotalExclusiveBufferNum() { + return accumulatorExclusiveBuffers + + memoryTierExclusiveBuffers + + diskTierExclusiveBuffers + + (remoteStorageBasePath == null ? 0 : remoteTierExclusiveBuffers); + } + + /** + * Get exclusive buffer number of each tier. + * + * @return buffer number of each tier. + */ + public List<Integer> getEachTierExclusiveBufferNum() { + List<Integer> tierExclusiveBuffers = new ArrayList<>(); + tierExclusiveBuffers.add(memoryTierExclusiveBuffers); + tierExclusiveBuffers.add(diskTierExclusiveBuffers); + if (remoteStorageBasePath != null) { + tierExclusiveBuffers.add(remoteTierExclusiveBuffers); + } + return tierExclusiveBuffers; + } + + public List<TierFactory> getTierFactories() { + return tierFactories; + } Review Comment: These are not consistent. `tierFactories` may or may not be the default factories, while the number of buffers for each tier is hard coded. I think at least we should add a `createDefaultTierExclusiveBufferNums` in the builder, so that we use the default list of buffer numbers only if we use the default factories. A better way might be have a `createDefultTierFactoriesAndMemorySpecs` that decide the hard coded factories and memory parameters at once. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org