zentol commented on code in PR #22169: URL: https://github.com/apache/flink/pull/22169#discussion_r1133956427
########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismStore.java: ########## @@ -18,14 +18,51 @@ package org.apache.flink.runtime.scheduler; +import org.apache.flink.runtime.jobgraph.JobResourceRequirements; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements; import java.util.HashMap; import java.util.Map; import java.util.Optional; /** Maintains the configured parallelisms for vertices, which should be defined by a scheduler. */ public class DefaultVertexParallelismStore implements MutableVertexParallelismStore { + + /** + * Create a new {@link VertexParallelismStore} that reflects given {@link + * JobResourceRequirements}. + * + * @param oldVertexParallelismStore old vertex parallelism store that serves as a base for the + * new one + * @param jobResourceRequirements to apply over the old vertex parallelism store + * @return new vertex parallelism store iff it was updated + */ + public static Optional<VertexParallelismStore> applyJobResourceRequirements( + VertexParallelismStore oldVertexParallelismStore, + JobResourceRequirements jobResourceRequirements) { + final DefaultVertexParallelismStore newVertexParallelismStore = + new DefaultVertexParallelismStore(); + boolean changed = false; + for (final JobVertexID jobVertexId : jobResourceRequirements.getJobVertices()) { + final VertexParallelismInformation oldVertexParallelismInfo = + oldVertexParallelismStore.getParallelismInfo(jobVertexId); + final int parallelism = + jobResourceRequirements + .findParallelism(jobVertexId) + .map(JobVertexResourceRequirements.Parallelism::getUpperBound) + .orElseGet(oldVertexParallelismInfo::getParallelism); + newVertexParallelismStore.setParallelismInfo( + jobVertexId, + new DefaultVertexParallelismInfo( + parallelism, + oldVertexParallelismInfo.getMaxParallelism(), + maxParallelism -> Optional.of("Cannot change the max parallelism."))); Review Comment: Would it make sense tzo make this lambda a singleton? ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismStore.java: ########## @@ -18,14 +18,51 @@ package org.apache.flink.runtime.scheduler; +import org.apache.flink.runtime.jobgraph.JobResourceRequirements; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements; import java.util.HashMap; import java.util.Map; import java.util.Optional; /** Maintains the configured parallelisms for vertices, which should be defined by a scheduler. */ public class DefaultVertexParallelismStore implements MutableVertexParallelismStore { + + /** + * Create a new {@link VertexParallelismStore} that reflects given {@link + * JobResourceRequirements}. + * + * @param oldVertexParallelismStore old vertex parallelism store that serves as a base for the + * new one + * @param jobResourceRequirements to apply over the old vertex parallelism store + * @return new vertex parallelism store iff it was updated + */ + public static Optional<VertexParallelismStore> applyJobResourceRequirements( + VertexParallelismStore oldVertexParallelismStore, + JobResourceRequirements jobResourceRequirements) { + final DefaultVertexParallelismStore newVertexParallelismStore = + new DefaultVertexParallelismStore(); + boolean changed = false; + for (final JobVertexID jobVertexId : jobResourceRequirements.getJobVertices()) { + final VertexParallelismInformation oldVertexParallelismInfo = + oldVertexParallelismStore.getParallelismInfo(jobVertexId); + final int parallelism = + jobResourceRequirements + .findParallelism(jobVertexId) + .map(JobVertexResourceRequirements.Parallelism::getUpperBound) + .orElseGet(oldVertexParallelismInfo::getParallelism); Review Comment: Shouldnt this throw an illegal state exception since we already validated earlier that all vertices are provided with the new requirements? ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java: ########## @@ -1050,6 +1055,14 @@ public CreatingExecutionGraph.AssignmentResult tryToAssignSlots( .orElseGet(CreatingExecutionGraph.AssignmentResult::notPossible); } + @Override + public void freeExcessiveReservedSlots() { + for (SlotInfo slotInfo : declarativeSlotPool.getFreeSlotsInformation()) { + declarativeSlotPool.freeReservedSlot( + slotInfo.getAllocationId(), null, System.currentTimeMillis()); + } Review Comment: Not sure what you're trying to do here. Freeing a free slot is a no-op. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org