reswqa commented on code in PR #23927:
URL: https://github.com/apache/flink/pull/23927#discussion_r1446953814


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java:
##########
@@ -58,6 +59,12 @@ class CreditBasedSequenceNumberingViewReader
 
     private final int initialCredit;
 
+    /**
+     * Cache of the index of the only subpartition if the underlining {@link 
ResultSubpartitionView}
+     * only consumes one subpartition.
+     */
+    private int subpartitionId;

Review Comment:
   We do need some explanation about the default value `-1`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java:
##########
@@ -301,6 +302,24 @@ public void setMetricGroup(TaskIOMetricGroup metrics) {
                 partitionId.getPartitionId(), resultPartitionBytes);
     }
 
+    @Override
+    public ResultSubpartitionView createSubpartitionView(
+            ResultSubpartitionIndexSet indexSet, BufferAvailabilityListener 
availabilityListener)
+            throws IOException {
+        // The ability to support multiple indexes is to be provided in 
subsequent commits of
+        // the corresponding pull request. As the function is about to be 
supported uniformly with
+        // one set of code, they will be placed in a common method shared by 
all shuffle
+        // implementations, and that will be this method.
+        Iterator<Integer> iterator = indexSet.values().iterator();
+        int index = iterator.next();
+        Preconditions.checkState(!iterator.hasNext());
+        return createSubpartitionView(index, availabilityListener);
+    }
+
+    /** Returns a reader for the subpartition with the given index. */

Review Comment:
   We need a full java doc to explain the differences and connections between 
this method and the previous ones.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java:
##########
@@ -229,6 +245,14 @@ ResultSubpartitionView.AvailabilityWithBacklog 
hasBuffersAvailable() {
         return subpartitionView.getAvailabilityAndBacklog(Integer.MAX_VALUE);
     }
 
+    @Override
+    public int peekNextBufferSubpartitionId() throws IOException {

Review Comment:
   Do we have some tests to cover the method like this one introduced in this 
commit? 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java:
##########
@@ -100,6 +104,13 @@ public Optional<Buffer> getNextBuffer(
         }
         Buffer bufferData = buffer.get();
         if (bufferData.getDataType() == Buffer.DataType.END_OF_SEGMENT) {
+            EndOfSegmentEvent event =
+                    (EndOfSegmentEvent)
+                            EventSerializer.fromSerializedEvent(
+                                    bufferData.getNioBufferReadable(), 
getClass().getClassLoader());
+            Preconditions.checkState(
+                    subpartitionId.equals(
+                            new 
TieredStorageSubpartitionId(event.getSubpartitionId())));

Review Comment:
   Is this deserialization only for sanity check?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to