wanglijie95 commented on a change in pull request #18130: URL: https://github.com/apache/flink/pull/18130#discussion_r791429362
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java ########## @@ -94,6 +97,7 @@ protected InputChannel( this.inputGate = checkNotNull(inputGate); this.channelInfo = new InputChannelInfo(inputGate.getGateIndex(), channelIndex); this.partitionId = checkNotNull(partitionId); + this.consumedSubpartitionIndex = consumedSubpartitionIndex; Review comment: Done ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java ########## @@ -168,8 +176,7 @@ protected void notifyBufferAvailable(int numAvailableBuffers) throws IOException * <p>The queue index to request depends on which sub task the channel belongs to and is * specified by the consumer of this channel. Review comment: Done ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ########## @@ -1073,7 +1085,32 @@ private boolean queueChannelUnsafe(InputChannel channel, boolean priority) { // ------------------------------------------------------------------------ - public Map<IntermediateResultPartitionID, InputChannel> getInputChannels() { + public Map<SubpartitionInfo, InputChannel> getInputChannels() { return inputChannels; } + + static class SubpartitionInfo { + private final IntermediateResultPartitionID partitionID; + private final int subpartitionIndex; + + SubpartitionInfo(IntermediateResultPartitionID partitionID, int subpartitionIndex) { + this.partitionID = partitionID; + this.subpartitionIndex = subpartitionIndex; + } + + @Override + public int hashCode() { + return partitionID.hashCode() ^ subpartitionIndex; + } + + @Override + public boolean equals(Object obj) { Review comment: Done ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java ########## @@ -224,15 +245,25 @@ private InputChannel createInputChannel( inputGate, index, nettyShuffleDescriptor, + consumedSubpartitionIndex, channelStatistics, metrics)); } + private static int calculateNumChannels( + int numShuffleDescriptors, SubpartitionIndexRange subpartitionIndexRange) { + return numShuffleDescriptors Review comment: Added. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ########## @@ -1073,7 +1085,32 @@ private boolean queueChannelUnsafe(InputChannel channel, boolean priority) { // ------------------------------------------------------------------------ - public Map<IntermediateResultPartitionID, InputChannel> getInputChannels() { + public Map<SubpartitionInfo, InputChannel> getInputChannels() { return inputChannels; } + + static class SubpartitionInfo { + private final IntermediateResultPartitionID partitionID; + private final int subpartitionIndex; + + SubpartitionInfo(IntermediateResultPartitionID partitionID, int subpartitionIndex) { + this.partitionID = partitionID; Review comment: Done ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ########## @@ -1073,7 +1085,32 @@ private boolean queueChannelUnsafe(InputChannel channel, boolean priority) { // ------------------------------------------------------------------------ - public Map<IntermediateResultPartitionID, InputChannel> getInputChannels() { + public Map<SubpartitionInfo, InputChannel> getInputChannels() { return inputChannels; } + + static class SubpartitionInfo { + private final IntermediateResultPartitionID partitionID; + private final int subpartitionIndex; + + SubpartitionInfo(IntermediateResultPartitionID partitionID, int subpartitionIndex) { + this.partitionID = partitionID; + this.subpartitionIndex = subpartitionIndex; Review comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org