JunRuiLee commented on code in PR #25790:
URL: https://github.com/apache/flink/pull/25790#discussion_r1895383469


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AllToAllBlockingResultInfo.java:
##########
@@ -139,7 +153,14 @@ public void resetPartitionInfo(int partitionIndex) {
     }
 
     public List<Long> getAggregatedSubpartitionBytes() {
-        checkState(aggregatedSubpartitionBytes != null, "Not all partition 
infos are ready");
-        return Collections.unmodifiableList(aggregatedSubpartitionBytes);
+        checkState(
+                aggregatedSubpartitionBytes != null
+                        || subpartitionBytesByPartitionIndex.size() == 
numOfPartitions,
+                "Not all partition infos are ready");
+        if (aggregatedSubpartitionBytes == null) {
+            return getAggregatedSubpartitionBytesInternal();

Review Comment:
   Here, I use `getAggregatedSubpartitionBytesInternal` because I do not want 
to clear `subpartitionBytesByPartitionIndex` and set 
`aggregatedSubpartitionBytes` since it does not meet the condition for 
aggregation.
   
   So, I introduced the method `getAggregatedSubpartitionBytesInternal`, which 
is only for querying and not for updating any state.
   
   And I will add some comments for explaining.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to