TanYuxin-tyx commented on code in PR #22975: URL: https://github.com/apache/flink/pull/22975#discussion_r1269371029
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java: ########## @@ -83,6 +106,13 @@ public class ResultPartitionFactory { private final int maxOverdraftBuffersPerGate; + // The following attributes will be null if tiered storage shuffle is disabled. Review Comment: Use /* */ doc instead here. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java: ########## @@ -101,9 +101,10 @@ public class SingleInputGateFactory { private final BufferDebloatConfiguration debloatConfiguration; - @Nullable - private final TieredStorageConfiguration - tieredStorageConfiguration; // is null if tiered storage shuffle is disabled. + // The following attributes will be null if tiered storage shuffle is disabled. Review Comment: Use `/* */` instead. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java: ########## @@ -266,6 +357,74 @@ private HybridShuffleConfiguration getHybridShuffleConfiguration( .build(); } + private BufferAccumulator createBufferAccumulator( + int numSubpartitions, + int accumulatorExclusiveBufferNum, Review Comment: ditto, `numAccumulatorExclusiveBuffers` ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java: ########## @@ -227,23 +263,78 @@ public ResultPartition create( } } else if (type == ResultPartitionType.HYBRID_FULL || type == ResultPartitionType.HYBRID_SELECTIVE) { - partition = - new HsResultPartition( - taskNameWithSubtaskAndId, - partitionIndex, - id, - type, - subpartitions.length, - maxParallelism, - batchShuffleReadBufferPool, - batchShuffleReadIOExecutor, - partitionManager, - channelManager.createChannel().getPath(), - networkBufferSize, - getHybridShuffleConfiguration(numberOfSubpartitions, type), - bufferCompressor, - isBroadcast, - bufferPoolFactory); + if (tieredStorageConfiguration != null) { + // Create memory manager. + TieredStorageMemoryManager memoryManager = + new TieredStorageMemoryManagerImpl( + checkNotNull(tieredStorageConfiguration) + .getNumBuffersTriggerFlushRatio(), + true); + + // Create buffer accumulator. + int accumulatorExclusiveBufferNum = Review Comment: Maybe use `numAccumulatorExclusiveBuffers` to replace `accumulatorExclusiveBufferNum`. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java: ########## @@ -18,58 +18,435 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common; -import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferAccumulator; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulator; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.SortBufferAccumulator; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk.DiskIOScheduler; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk.DiskTierFactory; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.memory.MemoryTierFactory; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote.RemoteTierFactory; -import javax.annotation.Nullable; - -import java.util.Arrays; +import java.time.Duration; +import java.util.ArrayList; import java.util.List; /** Configurations for the Tiered Storage. */ public class TieredStorageConfiguration { - // TODO, after implementing the tier factory, add appreciate implementations to the array. - private static final TierFactory[] DEFAULT_MEMORY_DISK_TIER_FACTORIES = new TierFactory[0]; + private static final String DEFAULT_REMOTE_STORAGE_BASE_PATH = null; + + private static final int DEFAULT_TIERED_STORAGE_BUFFER_SIZE = 32 * 1024; + + private static final int DEFAULT_MEMORY_TIER_EXCLUSIVE_BUFFERS = 100; + + private static final int DEFAULT_DISK_TIER_EXCLUSIVE_BUFFERS = 1; + + private static final int DEFAULT_REMOTE_TIER_EXCLUSIVE_BUFFERS = 1; + + private static final int DEFAULT_NUM_BUFFERS_USE_SORT_ACCUMULATOR_THRESHOLD = 512; + + private static final int DEFAULT_MEMORY_TIER_NUM_BYTES_PER_SEGMENT = 320 * 1024; + + private static final int DEFAULT_DISK_TIER_NUM_BYTES_PER_SEGMENT = 8 * 1024 * 1024; + + private static final int DEFAULT_REMOTE_TIER_NUM_BYTES_PER_SEGMENT = 8 * 1024 * 1024; + + private static final float DEFAULT_NUM_BUFFERS_TRIGGER_FLUSH_RATIO = 0.5f; Review Comment: Here I have a concern. Maybe 0.5 is too low to trigger the flush process, we'd better change to at least 0.6 or more. Because too less a trigger ratio will lead to generating too much little size regions, which is bad for sequential reading performance. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java: ########## @@ -227,23 +263,78 @@ public ResultPartition create( } } else if (type == ResultPartitionType.HYBRID_FULL || type == ResultPartitionType.HYBRID_SELECTIVE) { - partition = - new HsResultPartition( - taskNameWithSubtaskAndId, - partitionIndex, - id, - type, - subpartitions.length, - maxParallelism, - batchShuffleReadBufferPool, - batchShuffleReadIOExecutor, - partitionManager, - channelManager.createChannel().getPath(), - networkBufferSize, - getHybridShuffleConfiguration(numberOfSubpartitions, type), - bufferCompressor, - isBroadcast, - bufferPoolFactory); + if (tieredStorageConfiguration != null) { + // Create memory manager. + TieredStorageMemoryManager memoryManager = + new TieredStorageMemoryManagerImpl( + checkNotNull(tieredStorageConfiguration) + .getNumBuffersTriggerFlushRatio(), + true); + + // Create buffer accumulator. + int accumulatorExclusiveBufferNum = Review Comment: Maybe use `numAccumulatorExclusiveBuffers` to replace `accumulatorExclusiveBufferNum`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org