xintongsong commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11323#discussion_r388776246
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManager.java ########## @@ -114,4 +121,42 @@ public ActiveResourceManager( protected abstract Configuration loadClientConfiguration(); protected abstract double getCpuCores(final Configuration configuration); + + /** + * Utility class for counting pending workers per {@link WorkerResourceSpec}. + */ + @VisibleForTesting + static class PendingWorkerCounter { + private final Map<WorkerResourceSpec, Integer> pendingWorkerNums; + + @VisibleForTesting + PendingWorkerCounter() { + pendingWorkerNums = new HashMap<>(); + } + + public int getTotalNum() { + return pendingWorkerNums.values().stream().reduce(0, Integer::sum); + } + + public int getNum(final WorkerResourceSpec workerResourceSpec) { + return pendingWorkerNums.getOrDefault(Preconditions.checkNotNull(workerResourceSpec), 0); + } + + public int increateAndGet(final WorkerResourceSpec workerResourceSpec) { + return pendingWorkerNums.compute( + Preconditions.checkNotNull(workerResourceSpec), + (ignored, num) -> num != null ? num + 1 : 1); + } + + public int decreaseAndGet(final WorkerResourceSpec workerResourceSpec) { + final Integer newValue = pendingWorkerNums.compute( + Preconditions.checkNotNull(workerResourceSpec), Review comment: Same here. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services