xintongsong commented on code in PR #20739: URL: https://github.com/apache/flink/pull/20739#discussion_r962580760
########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java: ########## @@ -61,7 +63,7 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy { private final Set<ConsumedPartitionGroup> crossRegionConsumedPartitionGroups = Collections.newSetFromMap(new IdentityHashMap<>()); - private final Set<SchedulingPipelinedRegion> scheduledRegions = + private final Set<SchedulingPipelinedRegion> scheduledAndSchedulableRegions = Review Comment: I'd suggest to separate scheduled and schedulable regions. A region should be added to `scheduledRegions` only when it's actually scheduled. We can introduce another `schedulableRegions` (or `toBeScheduledRegions`), as a local variable, for `getAllSchedulableRegions`. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java: ########## @@ -226,44 +228,71 @@ private void maybeScheduleRegions(final Set<SchedulingPipelinedRegion> regions) SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder( schedulingTopology, regions); - final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache = new HashMap<>(); - final Set<SchedulingPipelinedRegion> downstreamSchedulableRegions = new HashSet<>(); - for (SchedulingPipelinedRegion region : regionsSorted) { - if (maybeScheduleRegion(region, consumableStatusCache)) { - downstreamSchedulableRegions.addAll( - consumedPartitionGroupsOfRegion.getOrDefault(region, Collections.emptySet()) - .stream() - .flatMap( - consumedPartitionGroups -> - partitionGroupConsumerRegions - .getOrDefault( - consumedPartitionGroups, - Collections.emptySet()) - .stream()) - .collect(Collectors.toSet())); - } + List<SchedulingPipelinedRegion> regionsToSchedule = getAllSchedulableRegions(regionsSorted); + + if (!regionsToSchedule.isEmpty()) { + // schedule regions + regionsToSchedule.forEach(this::scheduleRegion); } + } - if (!downstreamSchedulableRegions.isEmpty()) { - maybeScheduleRegions(downstreamSchedulableRegions); + private List<SchedulingPipelinedRegion> getAllSchedulableRegions( + List<SchedulingPipelinedRegion> regionsSorted) { + final List<SchedulingPipelinedRegion> regionsToSchedule = new LinkedList<>(); + final Queue<SchedulingPipelinedRegion> regionQueue = new LinkedList<>(regionsSorted); + while (!regionQueue.isEmpty()) { + final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache = new HashMap<>(); + final Set<ConsumedPartitionGroup> consumedPartitionGroupCache = new HashSet<>(); + final Set<SchedulingPipelinedRegion> addedToQueue = new HashSet<>(); + final int currentSize = regionQueue.size(); + for (int num = 0; num < currentSize; num++) { Review Comment: IIUC, the reason we read the queue in batches is that we need to reset the above caches between batches. We should explicitly comment this. Moreover, it might be more straightforward to have a method returns the next batch of regions, rather than using a queue. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java: ########## @@ -226,44 +228,71 @@ private void maybeScheduleRegions(final Set<SchedulingPipelinedRegion> regions) SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder( schedulingTopology, regions); Review Comment: I'm not entirely sure whether this sorting can be excluded from the loop. It seems there's no guarantee that the downstream regions being found are in strict topological order. I don't see immediate damage of this, but we'd better double check. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org