reswqa commented on code in PR #22733:
URL: https://github.com/apache/flink/pull/22733#discussion_r1233995628


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java:
##########
@@ -217,6 +222,12 @@ public class SingleInputGate extends IndexedInputGate {
     // The consumer client will be null if the tiered storage is not enabled.
     @Nullable private final TieredStorageConsumerClient 
tieredStorageConsumerClient;
 
+    // The partition ids in tiered storage will be null if the tiered storage 
is not enabled.
+    @Nullable private final List<TieredStoragePartitionId> 
tieredStoragePartitionIds;
+
+    // The subpartition ids in tiered storage will be null if the tiered 
storage is not enabled.
+    @Nullable private final List<TieredStorageSubpartitionId> 
tieredStorageSubpartitionIds;

Review Comment:
   I'd suggest wrap this three field to a class(maybe called 
`TieredStorageConsumerSpec`).  
   
   `(tieredStoragePartitionIds).get(index)` can be rewritten to 
`consumerSpec.getPartitionId(index)`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java:
##########
@@ -158,9 +166,34 @@ public SingleInputGate create(
 
         IndexRange subpartitionIndexRange = 
igdd.getConsumedSubpartitionIndexRange();
         TieredStorageConsumerClient tieredStorageConsumerClient = null;
+        List<TieredStoragePartitionId> tieredStoragePartitionIds = null;
+        List<TieredStorageSubpartitionId> tieredStorageSubpartitionIds = null;
         if (tieredStorageConfiguration != null) {
+            ShuffleDescriptor[] shuffleDescriptors = 
igdd.getShuffleDescriptors();
+            tieredStoragePartitionIds = new ArrayList<>();
+            tieredStorageSubpartitionIds = new ArrayList<>();
+            List<Tuple2<TieredStoragePartitionId, TieredStorageSubpartitionId>>
+                    partitionIdAndSubpartitionIds = new ArrayList<>();
+            for (ShuffleDescriptor shuffleDescriptor : shuffleDescriptors) {
+                for (int subpartitionId = 
subpartitionIndexRange.getStartIndex();
+                        subpartitionId <= subpartitionIndexRange.getEndIndex();
+                        ++subpartitionId) {
+                    TieredStoragePartitionId storagePartitionId =
+                            TieredStorageIdMappingUtils.convertId(
+                                    shuffleDescriptor.getResultPartitionID());
+                    TieredStorageSubpartitionId storageSubpartitionId =
+                            new TieredStorageSubpartitionId(subpartitionId);
+                    tieredStoragePartitionIds.add(storagePartitionId);

Review Comment:
   Why this is not placed in the first `for-loop` block?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageNettyServiceImpl.java:
##########
@@ -78,29 +75,22 @@ public void registerProducer(
     }
 
     @Override
-    public NettyConnectionReader registerConsumer(
+    public CompletableFuture<NettyConnectionReader> registerConsumer(
             TieredStoragePartitionId partitionId, TieredStorageSubpartitionId 
subpartitionId) {
-        Integer channelIndex = 
registeredChannelIndexes.get(partitionId).remove(subpartitionId);
-        if (registeredChannelIndexes.get(partitionId).isEmpty()) {
-            registeredChannelIndexes.remove(partitionId);
-        }
 
-        Supplier<InputChannel> inputChannelProvider =
-                
registeredInputChannelProviders.get(partitionId).remove(subpartitionId);
-        if (registeredInputChannelProviders.get(partitionId).isEmpty()) {
-            registeredInputChannelProviders.remove(partitionId);
+        List<NettyConnectionReaderRegistration> registrations =
+                getReaderRegistration(partitionId, subpartitionId);
+        for (NettyConnectionReaderRegistration registration : registrations) {
+            Optional<CompletableFuture<NettyConnectionReader>> futureOpt =
+                    registration.trySetConsumer();

Review Comment:
   We may also need `tryCreateNettyConnectionReader`.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingTieredStorageNettyService.java:
##########
@@ -49,9 +50,10 @@ public void registerProducer(
     }
 
     @Override
-    public NettyConnectionReader registerConsumer(
+    public CompletableFuture<NettyConnectionReader> registerConsumer(
             TieredStoragePartitionId partitionId, TieredStorageSubpartitionId 
subpartitionId) {
-        return registerConsumerFunction.apply(partitionId, subpartitionId);
+        return CompletableFuture.completedFuture(
+                registerConsumerFunction.apply(partitionId, subpartitionId));

Review Comment:
   `registerConsumerFunction ` should be a `BiFunction<TieredStoragePartitionId 
, TieredStorageSubpartitionId , CompletableFuture<NettyConnectionReader>>`.



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java:
##########
@@ -280,7 +281,8 @@ private IndexedInputGate createInputGateWithMetrics(
                         gateIndex,
                         gateDescriptor,
                         SingleInputGateBuilder.NO_OP_PRODUCER_CHECKER,
-                        newUnregisteredInputChannelMetrics());
+                        newUnregisteredInputChannelMetrics(),
+                        new TieredStorageNettyServiceImpl());

Review Comment:
   It's a bit strange: if I don't need to use `TS`, why do I also need to 
initialize `nettyService` in `NettyShuffleEnvironment`. It seems that we can 
make the it nullable for building `SingleInputGate`, and directly pass `null` 
here.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java:
##########
@@ -813,6 +813,11 @@ private void checkPartitionRequestQueueInitialized() 
throws IOException {
                 "Bug: partitionRequestClient is not initialized before 
processing data and no error is detected.");
     }
 
+    @Override
+    public void notifyRequiredSegmentId(int segmentId) {

Review Comment:
   We need some tests for `notifyRequiredSegmentId`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to