TanYuxin-tyx commented on code in PR #22975:
URL: https://github.com/apache/flink/pull/22975#discussion_r1269371029


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java:
##########
@@ -83,6 +106,13 @@ public class ResultPartitionFactory {
 
     private final int maxOverdraftBuffersPerGate;
 
+    // The following attributes will be null if tiered storage shuffle is 
disabled.

Review Comment:
   Use /* */ doc instead here.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java:
##########
@@ -101,9 +101,10 @@ public class SingleInputGateFactory {
 
     private final BufferDebloatConfiguration debloatConfiguration;
 
-    @Nullable
-    private final TieredStorageConfiguration
-            tieredStorageConfiguration; // is null if tiered storage shuffle 
is disabled.
+    // The following attributes will be null if tiered storage shuffle is 
disabled.

Review Comment:
   Use `/* */` instead.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java:
##########
@@ -266,6 +357,74 @@ private HybridShuffleConfiguration 
getHybridShuffleConfiguration(
                 .build();
     }
 
+    private BufferAccumulator createBufferAccumulator(
+            int numSubpartitions,
+            int accumulatorExclusiveBufferNum,

Review Comment:
   ditto, `numAccumulatorExclusiveBuffers`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java:
##########
@@ -227,23 +263,78 @@ public ResultPartition create(
             }
         } else if (type == ResultPartitionType.HYBRID_FULL
                 || type == ResultPartitionType.HYBRID_SELECTIVE) {
-            partition =
-                    new HsResultPartition(
-                            taskNameWithSubtaskAndId,
-                            partitionIndex,
-                            id,
-                            type,
-                            subpartitions.length,
-                            maxParallelism,
-                            batchShuffleReadBufferPool,
-                            batchShuffleReadIOExecutor,
-                            partitionManager,
-                            channelManager.createChannel().getPath(),
-                            networkBufferSize,
-                            
getHybridShuffleConfiguration(numberOfSubpartitions, type),
-                            bufferCompressor,
-                            isBroadcast,
-                            bufferPoolFactory);
+            if (tieredStorageConfiguration != null) {
+                // Create memory manager.
+                TieredStorageMemoryManager memoryManager =
+                        new TieredStorageMemoryManagerImpl(
+                                checkNotNull(tieredStorageConfiguration)
+                                        .getNumBuffersTriggerFlushRatio(),
+                                true);
+
+                // Create buffer accumulator.
+                int accumulatorExclusiveBufferNum =

Review Comment:
   Maybe use `numAccumulatorExclusiveBuffers` to replace 
`accumulatorExclusiveBufferNum`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java:
##########
@@ -18,58 +18,435 @@
 
 package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
 
-import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferAccumulator;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulator;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.SortBufferAccumulator;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk.DiskIOScheduler;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk.DiskTierFactory;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.memory.MemoryTierFactory;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote.RemoteTierFactory;
 
-import javax.annotation.Nullable;
-
-import java.util.Arrays;
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.List;
 
 /** Configurations for the Tiered Storage. */
 public class TieredStorageConfiguration {
 
-    // TODO, after implementing the tier factory, add appreciate 
implementations to the array.
-    private static final TierFactory[] DEFAULT_MEMORY_DISK_TIER_FACTORIES = 
new TierFactory[0];
+    private static final String DEFAULT_REMOTE_STORAGE_BASE_PATH = null;
+
+    private static final int DEFAULT_TIERED_STORAGE_BUFFER_SIZE = 32 * 1024;
+
+    private static final int DEFAULT_MEMORY_TIER_EXCLUSIVE_BUFFERS = 100;
+
+    private static final int DEFAULT_DISK_TIER_EXCLUSIVE_BUFFERS = 1;
+
+    private static final int DEFAULT_REMOTE_TIER_EXCLUSIVE_BUFFERS = 1;
+
+    private static final int 
DEFAULT_NUM_BUFFERS_USE_SORT_ACCUMULATOR_THRESHOLD = 512;
+
+    private static final int DEFAULT_MEMORY_TIER_NUM_BYTES_PER_SEGMENT = 320 * 
1024;
+
+    private static final int DEFAULT_DISK_TIER_NUM_BYTES_PER_SEGMENT = 8 * 
1024 * 1024;
+
+    private static final int DEFAULT_REMOTE_TIER_NUM_BYTES_PER_SEGMENT = 8 * 
1024 * 1024;
+
+    private static final float DEFAULT_NUM_BUFFERS_TRIGGER_FLUSH_RATIO = 0.5f;

Review Comment:
   Here I have a concern. 
   Maybe 0.5 is too low to trigger the flush process, we'd better change to at 
least 0.6 or more. Because too less a trigger ratio will lead to generating too 
much little size regions, which is bad for sequential reading performance.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java:
##########
@@ -227,23 +263,78 @@ public ResultPartition create(
             }
         } else if (type == ResultPartitionType.HYBRID_FULL
                 || type == ResultPartitionType.HYBRID_SELECTIVE) {
-            partition =
-                    new HsResultPartition(
-                            taskNameWithSubtaskAndId,
-                            partitionIndex,
-                            id,
-                            type,
-                            subpartitions.length,
-                            maxParallelism,
-                            batchShuffleReadBufferPool,
-                            batchShuffleReadIOExecutor,
-                            partitionManager,
-                            channelManager.createChannel().getPath(),
-                            networkBufferSize,
-                            
getHybridShuffleConfiguration(numberOfSubpartitions, type),
-                            bufferCompressor,
-                            isBroadcast,
-                            bufferPoolFactory);
+            if (tieredStorageConfiguration != null) {
+                // Create memory manager.
+                TieredStorageMemoryManager memoryManager =
+                        new TieredStorageMemoryManagerImpl(
+                                checkNotNull(tieredStorageConfiguration)
+                                        .getNumBuffersTriggerFlushRatio(),
+                                true);
+
+                // Create buffer accumulator.
+                int accumulatorExclusiveBufferNum =

Review Comment:
   Maybe use `numAccumulatorExclusiveBuffers` to replace 
`accumulatorExclusiveBufferNum`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to