xintongsong commented on code in PR #20739:
URL: https://github.com/apache/flink/pull/20739#discussion_r962580760


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java:
##########
@@ -61,7 +63,7 @@ public class PipelinedRegionSchedulingStrategy implements 
SchedulingStrategy {
     private final Set<ConsumedPartitionGroup> 
crossRegionConsumedPartitionGroups =
             Collections.newSetFromMap(new IdentityHashMap<>());
 
-    private final Set<SchedulingPipelinedRegion> scheduledRegions =
+    private final Set<SchedulingPipelinedRegion> 
scheduledAndSchedulableRegions =

Review Comment:
   I'd suggest to separate scheduled and schedulable regions. A region should 
be added to `scheduledRegions` only when it's actually scheduled. We can 
introduce another `schedulableRegions` (or `toBeScheduledRegions`), as a local 
variable, for `getAllSchedulableRegions`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java:
##########
@@ -226,44 +228,71 @@ private void maybeScheduleRegions(final 
Set<SchedulingPipelinedRegion> regions)
                 SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder(
                         schedulingTopology, regions);
 
-        final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache = new 
HashMap<>();
-        final Set<SchedulingPipelinedRegion> downstreamSchedulableRegions = 
new HashSet<>();
-        for (SchedulingPipelinedRegion region : regionsSorted) {
-            if (maybeScheduleRegion(region, consumableStatusCache)) {
-                downstreamSchedulableRegions.addAll(
-                        consumedPartitionGroupsOfRegion.getOrDefault(region, 
Collections.emptySet())
-                                .stream()
-                                .flatMap(
-                                        consumedPartitionGroups ->
-                                                partitionGroupConsumerRegions
-                                                        .getOrDefault(
-                                                                
consumedPartitionGroups,
-                                                                
Collections.emptySet())
-                                                        .stream())
-                                .collect(Collectors.toSet()));
-            }
+        List<SchedulingPipelinedRegion> regionsToSchedule = 
getAllSchedulableRegions(regionsSorted);
+
+        if (!regionsToSchedule.isEmpty()) {
+            // schedule regions
+            regionsToSchedule.forEach(this::scheduleRegion);
         }
+    }
 
-        if (!downstreamSchedulableRegions.isEmpty()) {
-            maybeScheduleRegions(downstreamSchedulableRegions);
+    private List<SchedulingPipelinedRegion> getAllSchedulableRegions(
+            List<SchedulingPipelinedRegion> regionsSorted) {
+        final List<SchedulingPipelinedRegion> regionsToSchedule = new 
LinkedList<>();
+        final Queue<SchedulingPipelinedRegion> regionQueue = new 
LinkedList<>(regionsSorted);
+        while (!regionQueue.isEmpty()) {
+            final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache = 
new HashMap<>();
+            final Set<ConsumedPartitionGroup> consumedPartitionGroupCache = 
new HashSet<>();
+            final Set<SchedulingPipelinedRegion> addedToQueue = new 
HashSet<>();
+            final int currentSize = regionQueue.size();
+            for (int num = 0; num < currentSize; num++) {

Review Comment:
   IIUC, the reason we read the queue in batches is that we need to reset the 
above caches between batches. We should explicitly comment this. Moreover, it 
might be more straightforward to have a method returns the next batch of 
regions, rather than using a queue.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java:
##########
@@ -226,44 +228,71 @@ private void maybeScheduleRegions(final 
Set<SchedulingPipelinedRegion> regions)
                 SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder(
                         schedulingTopology, regions);

Review Comment:
   I'm not entirely sure whether this sorting can be excluded from the loop. It 
seems there's no guarantee that the downstream regions being found are in 
strict topological order. I don't see immediate damage of this, but we'd better 
double check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to