reswqa commented on code in PR #22733: URL: https://github.com/apache/flink/pull/22733#discussion_r1233995628
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java: ########## @@ -217,6 +222,12 @@ public class SingleInputGate extends IndexedInputGate { // The consumer client will be null if the tiered storage is not enabled. @Nullable private final TieredStorageConsumerClient tieredStorageConsumerClient; + // The partition ids in tiered storage will be null if the tiered storage is not enabled. + @Nullable private final List<TieredStoragePartitionId> tieredStoragePartitionIds; + + // The subpartition ids in tiered storage will be null if the tiered storage is not enabled. + @Nullable private final List<TieredStorageSubpartitionId> tieredStorageSubpartitionIds; Review Comment: I'd suggest wrap this three field to a class(maybe called `TieredStorageConsumerSpec`). `(tieredStoragePartitionIds).get(index)` can be rewritten to `consumerSpec.getPartitionId(index)`. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java: ########## @@ -158,9 +166,34 @@ public SingleInputGate create( IndexRange subpartitionIndexRange = igdd.getConsumedSubpartitionIndexRange(); TieredStorageConsumerClient tieredStorageConsumerClient = null; + List<TieredStoragePartitionId> tieredStoragePartitionIds = null; + List<TieredStorageSubpartitionId> tieredStorageSubpartitionIds = null; if (tieredStorageConfiguration != null) { + ShuffleDescriptor[] shuffleDescriptors = igdd.getShuffleDescriptors(); + tieredStoragePartitionIds = new ArrayList<>(); + tieredStorageSubpartitionIds = new ArrayList<>(); + List<Tuple2<TieredStoragePartitionId, TieredStorageSubpartitionId>> + partitionIdAndSubpartitionIds = new ArrayList<>(); + for (ShuffleDescriptor shuffleDescriptor : shuffleDescriptors) { + for (int subpartitionId = subpartitionIndexRange.getStartIndex(); + subpartitionId <= subpartitionIndexRange.getEndIndex(); + ++subpartitionId) { + TieredStoragePartitionId storagePartitionId = + TieredStorageIdMappingUtils.convertId( + shuffleDescriptor.getResultPartitionID()); + TieredStorageSubpartitionId storageSubpartitionId = + new TieredStorageSubpartitionId(subpartitionId); + tieredStoragePartitionIds.add(storagePartitionId); Review Comment: Why this is not placed in the first `for-loop` block? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageNettyServiceImpl.java: ########## @@ -78,29 +75,22 @@ public void registerProducer( } @Override - public NettyConnectionReader registerConsumer( + public CompletableFuture<NettyConnectionReader> registerConsumer( TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId) { - Integer channelIndex = registeredChannelIndexes.get(partitionId).remove(subpartitionId); - if (registeredChannelIndexes.get(partitionId).isEmpty()) { - registeredChannelIndexes.remove(partitionId); - } - Supplier<InputChannel> inputChannelProvider = - registeredInputChannelProviders.get(partitionId).remove(subpartitionId); - if (registeredInputChannelProviders.get(partitionId).isEmpty()) { - registeredInputChannelProviders.remove(partitionId); + List<NettyConnectionReaderRegistration> registrations = + getReaderRegistration(partitionId, subpartitionId); + for (NettyConnectionReaderRegistration registration : registrations) { + Optional<CompletableFuture<NettyConnectionReader>> futureOpt = + registration.trySetConsumer(); Review Comment: We may also need `tryCreateNettyConnectionReader`. ########## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingTieredStorageNettyService.java: ########## @@ -49,9 +50,10 @@ public void registerProducer( } @Override - public NettyConnectionReader registerConsumer( + public CompletableFuture<NettyConnectionReader> registerConsumer( TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId) { - return registerConsumerFunction.apply(partitionId, subpartitionId); + return CompletableFuture.completedFuture( + registerConsumerFunction.apply(partitionId, subpartitionId)); Review Comment: `registerConsumerFunction ` should be a `BiFunction<TieredStoragePartitionId , TieredStorageSubpartitionId , CompletableFuture<NettyConnectionReader>>`. ########## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java: ########## @@ -280,7 +281,8 @@ private IndexedInputGate createInputGateWithMetrics( gateIndex, gateDescriptor, SingleInputGateBuilder.NO_OP_PRODUCER_CHECKER, - newUnregisteredInputChannelMetrics()); + newUnregisteredInputChannelMetrics(), + new TieredStorageNettyServiceImpl()); Review Comment: It's a bit strange: if I don't need to use `TS`, why do I also need to initialize `nettyService` in `NettyShuffleEnvironment`. It seems that we can make the it nullable for building `SingleInputGate`, and directly pass `null` here. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java: ########## @@ -813,6 +813,11 @@ private void checkPartitionRequestQueueInitialized() throws IOException { "Bug: partitionRequestClient is not initialized before processing data and no error is detected."); } + @Override + public void notifyRequiredSegmentId(int segmentId) { Review Comment: We need some tests for `notifyRequiredSegmentId`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org