JunRuiLee commented on code in PR #25790: URL: https://github.com/apache/flink/pull/25790#discussion_r1895383469
########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AllToAllBlockingResultInfo.java: ########## @@ -139,7 +153,14 @@ public void resetPartitionInfo(int partitionIndex) { } public List<Long> getAggregatedSubpartitionBytes() { - checkState(aggregatedSubpartitionBytes != null, "Not all partition infos are ready"); - return Collections.unmodifiableList(aggregatedSubpartitionBytes); + checkState( + aggregatedSubpartitionBytes != null + || subpartitionBytesByPartitionIndex.size() == numOfPartitions, + "Not all partition infos are ready"); + if (aggregatedSubpartitionBytes == null) { + return getAggregatedSubpartitionBytesInternal(); Review Comment: Here, I use `getAggregatedSubpartitionBytesInternal` because I do not want to clear `subpartitionBytesByPartitionIndex` and set `aggregatedSubpartitionBytes` since it does not meet the condition for aggregation. So, I introduced the method `getAggregatedSubpartitionBytesInternal`, which is only for querying and not for updating any state. And I will add some comments for explaining. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org