azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r290383005
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java ########## @@ -99,110 +96,161 @@ public SingleInputGate create( @Nonnull InputGateDeploymentDescriptor igdd, @Nonnull PartitionProducerStateProvider partitionProducerStateProvider, @Nonnull InputChannelMetrics metrics) { - final IntermediateDataSetID consumedResultId = checkNotNull(igdd.getConsumedResultId()); - final ResultPartitionType consumedPartitionType = checkNotNull(igdd.getConsumedPartitionType()); - - final int consumedSubpartitionIndex = igdd.getConsumedSubpartitionIndex(); - checkArgument(consumedSubpartitionIndex >= 0); - - final InputChannelDeploymentDescriptor[] icdd = checkNotNull(igdd.getInputChannelDeploymentDescriptors()); + SupplierWithException<BufferPool, IOException> bufferPoolFactory = createBufferPoolFactory( + networkBufferPool, + isCreditBased, + networkBuffersPerChannel, + floatingNetworkBuffersPerGate, + igdd.getInputChannelDescriptors().length, + igdd.getConsumedPartitionType()); - final SingleInputGate inputGate = new SingleInputGate( + SingleInputGate inputGate = new SingleInputGate( owningTaskName, - consumedResultId, - consumedPartitionType, - consumedSubpartitionIndex, - icdd.length, + igdd.getConsumedResultId(), + igdd.getConsumedPartitionType(), + igdd.getConsumedSubpartitionIndex(), + igdd.getInputChannelDescriptors().length, partitionProducerStateProvider, isCreditBased, - createBufferPoolFactory(icdd.length, consumedPartitionType)); + bufferPoolFactory); + + createInputChannels(owningTaskName, igdd, inputGate, metrics); + return inputGate; + } + + private void createInputChannels( + String owningTaskName, + InputGateDeploymentDescriptor inputGateDeploymentDescriptor, + SingleInputGate inputGate, + InputChannelMetrics metrics) { + ShuffleDescriptor[] inputChannelDescriptors = + inputGateDeploymentDescriptor.getInputChannelDescriptors(); // Create the input channels. There is one input channel for each consumed partition. - final InputChannel[] inputChannels = new InputChannel[icdd.length]; + InputChannel[] inputChannels = new InputChannel[inputChannelDescriptors.length]; - int numLocalChannels = 0; - int numRemoteChannels = 0; - int numUnknownChannels = 0; + ChannelStatistics channelStatistics = new ChannelStatistics(); for (int i = 0; i < inputChannels.length; i++) { - final ResultPartitionID partitionId = icdd[i].getConsumedPartitionId(); - final ResultPartitionLocation partitionLocation = icdd[i].getConsumedPartitionLocation(); - - if (partitionLocation.isLocal()) { - inputChannels[i] = new LocalInputChannel(inputGate, i, partitionId, - partitionManager, - taskEventPublisher, - partitionRequestInitialBackoff, - partitionRequestMaxBackoff, - metrics); - - numLocalChannels++; - } - else if (partitionLocation.isRemote()) { - inputChannels[i] = new RemoteInputChannel(inputGate, i, partitionId, - partitionLocation.getConnectionId(), - connectionManager, - partitionRequestInitialBackoff, - partitionRequestMaxBackoff, - metrics, - networkBufferPool); - - numRemoteChannels++; - } - else if (partitionLocation.isUnknown()) { - inputChannels[i] = new UnknownInputChannel(inputGate, i, partitionId, - partitionManager, - taskEventPublisher, - connectionManager, - partitionRequestInitialBackoff, - partitionRequestMaxBackoff, - metrics, - networkBufferPool); - - numUnknownChannels++; - } - else { - throw new IllegalStateException("Unexpected partition location."); - } - - inputGate.setInputChannel(partitionId.getPartitionId(), inputChannels[i]); + inputChannels[i] = createInputChannel( + inputGate, + i, + inputGateDeploymentDescriptor.getConsumerLocation(), + inputChannelDescriptors[i], + channelStatistics, + metrics); + ResultPartitionID resultPartitionID = inputChannels[i].getPartitionId(); + inputGate.setInputChannel(resultPartitionID.getPartitionId(), inputChannels[i]); } - LOG.debug("{}: Created {} input channels (local: {}, remote: {}, unknown: {}).", + LOG.debug("{}: Created {} input channels ({}).", owningTaskName, inputChannels.length, - numLocalChannels, - numRemoteChannels, - numUnknownChannels); + channelStatistics); + } - return inputGate; + private InputChannel createInputChannel( + SingleInputGate inputGate, + int index, + ResourceID consumerLocation, + ShuffleDescriptor inputChannelDescriptor, Review comment: `shuffleDescriptor `, I will rename ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services