reswqa commented on code in PR #23927: URL: https://github.com/apache/flink/pull/23927#discussion_r1446953814
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java: ########## @@ -58,6 +59,12 @@ class CreditBasedSequenceNumberingViewReader private final int initialCredit; + /** + * Cache of the index of the only subpartition if the underlining {@link ResultSubpartitionView} + * only consumes one subpartition. + */ + private int subpartitionId; Review Comment: We do need some explanation about the default value `-1`. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java: ########## @@ -301,6 +302,24 @@ public void setMetricGroup(TaskIOMetricGroup metrics) { partitionId.getPartitionId(), resultPartitionBytes); } + @Override + public ResultSubpartitionView createSubpartitionView( + ResultSubpartitionIndexSet indexSet, BufferAvailabilityListener availabilityListener) + throws IOException { + // The ability to support multiple indexes is to be provided in subsequent commits of + // the corresponding pull request. As the function is about to be supported uniformly with + // one set of code, they will be placed in a common method shared by all shuffle + // implementations, and that will be this method. + Iterator<Integer> iterator = indexSet.values().iterator(); + int index = iterator.next(); + Preconditions.checkState(!iterator.hasNext()); + return createSubpartitionView(index, availabilityListener); + } + + /** Returns a reader for the subpartition with the given index. */ Review Comment: We need a full java doc to explain the differences and connections between this method and the previous ones. ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java: ########## @@ -229,6 +245,14 @@ ResultSubpartitionView.AvailabilityWithBacklog hasBuffersAvailable() { return subpartitionView.getAvailabilityAndBacklog(Integer.MAX_VALUE); } + @Override + public int peekNextBufferSubpartitionId() throws IOException { Review Comment: Do we have some tests to cover the method like this one introduced in this commit? ########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java: ########## @@ -100,6 +104,13 @@ public Optional<Buffer> getNextBuffer( } Buffer bufferData = buffer.get(); if (bufferData.getDataType() == Buffer.DataType.END_OF_SEGMENT) { + EndOfSegmentEvent event = + (EndOfSegmentEvent) + EventSerializer.fromSerializedEvent( + bufferData.getNioBufferReadable(), getClass().getClassLoader()); + Preconditions.checkState( + subpartitionId.equals( + new TieredStorageSubpartitionId(event.getSubpartitionId()))); Review Comment: Is this deserialization only for sanity check? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org