azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r290383005
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
 ##########
 @@ -99,110 +96,161 @@ public SingleInputGate create(
                        @Nonnull InputGateDeploymentDescriptor igdd,
                        @Nonnull PartitionProducerStateProvider 
partitionProducerStateProvider,
                        @Nonnull InputChannelMetrics metrics) {
-               final IntermediateDataSetID consumedResultId = 
checkNotNull(igdd.getConsumedResultId());
-               final ResultPartitionType consumedPartitionType = 
checkNotNull(igdd.getConsumedPartitionType());
-
-               final int consumedSubpartitionIndex = 
igdd.getConsumedSubpartitionIndex();
-               checkArgument(consumedSubpartitionIndex >= 0);
-
-               final InputChannelDeploymentDescriptor[] icdd = 
checkNotNull(igdd.getInputChannelDeploymentDescriptors());
+               SupplierWithException<BufferPool, IOException> 
bufferPoolFactory = createBufferPoolFactory(
+                       networkBufferPool,
+                       isCreditBased,
+                       networkBuffersPerChannel,
+                       floatingNetworkBuffersPerGate,
+                       igdd.getInputChannelDescriptors().length,
+                       igdd.getConsumedPartitionType());
 
-               final SingleInputGate inputGate = new SingleInputGate(
+               SingleInputGate inputGate = new SingleInputGate(
                        owningTaskName,
-                       consumedResultId,
-                       consumedPartitionType,
-                       consumedSubpartitionIndex,
-                       icdd.length,
+                       igdd.getConsumedResultId(),
+                       igdd.getConsumedPartitionType(),
+                       igdd.getConsumedSubpartitionIndex(),
+                       igdd.getInputChannelDescriptors().length,
                        partitionProducerStateProvider,
                        isCreditBased,
-                       createBufferPoolFactory(icdd.length, 
consumedPartitionType));
+                       bufferPoolFactory);
+
+               createInputChannels(owningTaskName, igdd, inputGate, metrics);
+               return inputGate;
+       }
+
+       private void createInputChannels(
+                       String owningTaskName,
+                       InputGateDeploymentDescriptor 
inputGateDeploymentDescriptor,
+                       SingleInputGate inputGate,
+                       InputChannelMetrics metrics) {
+               ShuffleDescriptor[] inputChannelDescriptors =
+                       
inputGateDeploymentDescriptor.getInputChannelDescriptors();
 
                // Create the input channels. There is one input channel for 
each consumed partition.
-               final InputChannel[] inputChannels = new 
InputChannel[icdd.length];
+               InputChannel[] inputChannels = new 
InputChannel[inputChannelDescriptors.length];
 
-               int numLocalChannels = 0;
-               int numRemoteChannels = 0;
-               int numUnknownChannels = 0;
+               ChannelStatistics channelStatistics = new ChannelStatistics();
 
                for (int i = 0; i < inputChannels.length; i++) {
-                       final ResultPartitionID partitionId = 
icdd[i].getConsumedPartitionId();
-                       final ResultPartitionLocation partitionLocation = 
icdd[i].getConsumedPartitionLocation();
-
-                       if (partitionLocation.isLocal()) {
-                               inputChannels[i] = new 
LocalInputChannel(inputGate, i, partitionId,
-                                       partitionManager,
-                                       taskEventPublisher,
-                                       partitionRequestInitialBackoff,
-                                       partitionRequestMaxBackoff,
-                                       metrics);
-
-                               numLocalChannels++;
-                       }
-                       else if (partitionLocation.isRemote()) {
-                               inputChannels[i] = new 
RemoteInputChannel(inputGate, i, partitionId,
-                                       partitionLocation.getConnectionId(),
-                                       connectionManager,
-                                       partitionRequestInitialBackoff,
-                                       partitionRequestMaxBackoff,
-                                       metrics,
-                                       networkBufferPool);
-
-                               numRemoteChannels++;
-                       }
-                       else if (partitionLocation.isUnknown()) {
-                               inputChannels[i] = new 
UnknownInputChannel(inputGate, i, partitionId,
-                                       partitionManager,
-                                       taskEventPublisher,
-                                       connectionManager,
-                                       partitionRequestInitialBackoff,
-                                       partitionRequestMaxBackoff,
-                                       metrics,
-                                       networkBufferPool);
-
-                               numUnknownChannels++;
-                       }
-                       else {
-                               throw new IllegalStateException("Unexpected 
partition location.");
-                       }
-
-                       inputGate.setInputChannel(partitionId.getPartitionId(), 
inputChannels[i]);
+                       inputChannels[i] = createInputChannel(
+                               inputGate,
+                               i,
+                               
inputGateDeploymentDescriptor.getConsumerLocation(),
+                               inputChannelDescriptors[i],
+                               channelStatistics,
+                               metrics);
+                       ResultPartitionID resultPartitionID = 
inputChannels[i].getPartitionId();
+                       
inputGate.setInputChannel(resultPartitionID.getPartitionId(), inputChannels[i]);
                }
 
-               LOG.debug("{}: Created {} input channels (local: {}, remote: 
{}, unknown: {}).",
+               LOG.debug("{}: Created {} input channels ({}).",
                        owningTaskName,
                        inputChannels.length,
-                       numLocalChannels,
-                       numRemoteChannels,
-                       numUnknownChannels);
+                       channelStatistics);
+       }
 
-               return inputGate;
+       private InputChannel createInputChannel(
+                       SingleInputGate inputGate,
+                       int index,
+                       ResourceID consumerLocation,
+                       ShuffleDescriptor inputChannelDescriptor,
 
 Review comment:
   `shuffleDescriptor `, I will rename

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to