zentol commented on code in PR #22169:
URL: https://github.com/apache/flink/pull/22169#discussion_r1133956427


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismStore.java:
##########
@@ -18,14 +18,51 @@
 
 package org.apache.flink.runtime.scheduler;
 
+import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
 /** Maintains the configured parallelisms for vertices, which should be 
defined by a scheduler. */
 public class DefaultVertexParallelismStore implements 
MutableVertexParallelismStore {
+
+    /**
+     * Create a new {@link VertexParallelismStore} that reflects given {@link
+     * JobResourceRequirements}.
+     *
+     * @param oldVertexParallelismStore old vertex parallelism store that 
serves as a base for the
+     *     new one
+     * @param jobResourceRequirements to apply over the old vertex parallelism 
store
+     * @return new vertex parallelism store iff it was updated
+     */
+    public static Optional<VertexParallelismStore> 
applyJobResourceRequirements(
+            VertexParallelismStore oldVertexParallelismStore,
+            JobResourceRequirements jobResourceRequirements) {
+        final DefaultVertexParallelismStore newVertexParallelismStore =
+                new DefaultVertexParallelismStore();
+        boolean changed = false;
+        for (final JobVertexID jobVertexId : 
jobResourceRequirements.getJobVertices()) {
+            final VertexParallelismInformation oldVertexParallelismInfo =
+                    oldVertexParallelismStore.getParallelismInfo(jobVertexId);
+            final int parallelism =
+                    jobResourceRequirements
+                            .findParallelism(jobVertexId)
+                            
.map(JobVertexResourceRequirements.Parallelism::getUpperBound)
+                            
.orElseGet(oldVertexParallelismInfo::getParallelism);
+            newVertexParallelismStore.setParallelismInfo(
+                    jobVertexId,
+                    new DefaultVertexParallelismInfo(
+                            parallelism,
+                            oldVertexParallelismInfo.getMaxParallelism(),
+                            maxParallelism -> Optional.of("Cannot change the 
max parallelism.")));

Review Comment:
   Would it make sense tzo make this lambda a singleton?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismStore.java:
##########
@@ -18,14 +18,51 @@
 
 package org.apache.flink.runtime.scheduler;
 
+import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
 /** Maintains the configured parallelisms for vertices, which should be 
defined by a scheduler. */
 public class DefaultVertexParallelismStore implements 
MutableVertexParallelismStore {
+
+    /**
+     * Create a new {@link VertexParallelismStore} that reflects given {@link
+     * JobResourceRequirements}.
+     *
+     * @param oldVertexParallelismStore old vertex parallelism store that 
serves as a base for the
+     *     new one
+     * @param jobResourceRequirements to apply over the old vertex parallelism 
store
+     * @return new vertex parallelism store iff it was updated
+     */
+    public static Optional<VertexParallelismStore> 
applyJobResourceRequirements(
+            VertexParallelismStore oldVertexParallelismStore,
+            JobResourceRequirements jobResourceRequirements) {
+        final DefaultVertexParallelismStore newVertexParallelismStore =
+                new DefaultVertexParallelismStore();
+        boolean changed = false;
+        for (final JobVertexID jobVertexId : 
jobResourceRequirements.getJobVertices()) {
+            final VertexParallelismInformation oldVertexParallelismInfo =
+                    oldVertexParallelismStore.getParallelismInfo(jobVertexId);
+            final int parallelism =
+                    jobResourceRequirements
+                            .findParallelism(jobVertexId)
+                            
.map(JobVertexResourceRequirements.Parallelism::getUpperBound)
+                            
.orElseGet(oldVertexParallelismInfo::getParallelism);

Review Comment:
   Shouldnt this throw an illegal state exception since we already validated 
earlier that all vertices are provided with the new requirements?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -1050,6 +1055,14 @@ public CreatingExecutionGraph.AssignmentResult 
tryToAssignSlots(
                 
.orElseGet(CreatingExecutionGraph.AssignmentResult::notPossible);
     }
 
+    @Override
+    public void freeExcessiveReservedSlots() {
+        for (SlotInfo slotInfo : 
declarativeSlotPool.getFreeSlotsInformation()) {
+            declarativeSlotPool.freeReservedSlot(
+                    slotInfo.getAllocationId(), null, 
System.currentTimeMillis());
+        }

Review Comment:
   Not sure what you're trying to do here. Freeing a free slot is a no-op.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to