bernardodemarco commented on code in PR #9102: URL: https://github.com/apache/cloudstack/pull/9102#discussion_r1942940914
########## plugins/integrations/kubernetes-service/src/main/java/com/cloud/kubernetes/cluster/actionworkers/KubernetesClusterResourceModifierActionWorker.java: ########## @@ -293,13 +259,35 @@ protected DeployDestination plan(final long nodesCount, final DataCenter zone, f throw new InsufficientServerCapacityException(msg, DataCenter.class, zone.getId()); } - protected DeployDestination plan() throws InsufficientServerCapacityException { - ServiceOffering offering = serviceOfferingDao.findById(kubernetesCluster.getServiceOfferingId()); + /** + * Plan Kubernetes Cluster Deployment + * @return a map of DeployDestination per node type + */ + protected Map<String, DeployDestination> planKubernetesCluster(Long domainId, Long accountId, Hypervisor.HypervisorType hypervisorType) throws InsufficientServerCapacityException { + Map<String, DeployDestination> destinationMap = new HashMap<>(); DataCenter zone = dataCenterDao.findById(kubernetesCluster.getZoneId()); if (logger.isDebugEnabled()) { logger.debug("Checking deployment destination for Kubernetes cluster: {} in zone: {}", kubernetesCluster, zone); } - return plan(kubernetesCluster.getTotalNodeCount(), zone, offering); + long controlNodeCount = kubernetesCluster.getControlNodeCount(); + long clusterSize = kubernetesCluster.getNodeCount(); + long etcdNodes = kubernetesCluster.getEtcdNodeCount(); + Map<String, Long> nodeTypeCount = Map.of(WORKER.name(), clusterSize, + CONTROL.name(), controlNodeCount, ETCD.name(), etcdNodes); + + for (KubernetesClusterNodeType nodeType : CLUSTER_NODES_TYPES_LIST) { + Long nodes = nodeTypeCount.getOrDefault(nodeType.name(), kubernetesCluster.getServiceOfferingId()); + if (nodes == null || nodes == 0) { + continue; + } + ServiceOffering nodeOffering = getServiceOfferingForNodeTypeOnCluster(nodeType, kubernetesCluster); + if (logger.isDebugEnabled()) { + logger.debug(String.format("Checking deployment destination for %s nodes on Kubernetes cluster : %s in zone : %s", nodeType.name(), kubernetesCluster.getName(), zone.getName())); + } Review Comment: ```suggestion if (logger.isDebugEnabled()) { logger.debug("Checking deployment destination for {} nodes on Kubernetes cluster : {} in zone : {}", nodeType.name(), kubernetesCluster.getName(), zone.getName()); } ``` ########## plugins/integrations/kubernetes-service/src/main/java/com/cloud/kubernetes/cluster/KubernetesServiceHelperImpl.java: ########## @@ -106,6 +120,127 @@ public void checkVmCanBeDestroyed(UserVm userVm) { throw new CloudRuntimeException(msg); } + @Override + public boolean isValidNodeType(String nodeType) { + if (StringUtils.isBlank(nodeType)) { + return false; + } + try { + KubernetesClusterNodeType.valueOf(nodeType.toUpperCase()); + return true; + } catch (IllegalArgumentException e) { + return false; + } + } + + @Override + public Map<String, Long> getServiceOfferingNodeTypeMap(Map<String, Map<String, String>> serviceOfferingNodeTypeMap) { + Map<String, Long> mapping = new HashMap<>(); + if (MapUtils.isNotEmpty(serviceOfferingNodeTypeMap)) { + for (Map<String, String> entry : serviceOfferingNodeTypeMap.values()) { + processNodeTypeOfferingEntryAndAddToMappingIfValid(entry, mapping); + } + } + return mapping; + } + + protected void checkNodeTypeOfferingEntryCompleteness(String nodeTypeStr, String serviceOfferingUuid) { + if (StringUtils.isAnyEmpty(nodeTypeStr, serviceOfferingUuid)) { + String error = String.format("Incomplete Node Type to Service Offering ID mapping: '%s' -> '%s'", nodeTypeStr, serviceOfferingUuid); + logger.error(error); + throw new InvalidParameterValueException(error); + } + } + + protected void checkNodeTypeOfferingEntryValues(String nodeTypeStr, ServiceOffering serviceOffering, String serviceOfferingUuid) { + if (!isValidNodeType(nodeTypeStr)) { + String error = String.format("The provided value '%s' for Node Type is invalid", nodeTypeStr); + logger.error(error); + throw new InvalidParameterValueException(String.format(error)); + } + if (serviceOffering == null) { + String error = String.format("Cannot find a service offering with ID %s", serviceOfferingUuid); + logger.error(error); + throw new InvalidParameterValueException(error); + } + } + + protected void addNodeTypeOfferingEntry(String nodeTypeStr, String serviceOfferingUuid, ServiceOffering serviceOffering, Map<String, Long> mapping) { + if (logger.isDebugEnabled()) { + logger.debug(String.format("Node Type: '%s' should use Service Offering ID: '%s'", nodeTypeStr, serviceOfferingUuid)); + } Review Comment: ```suggestion if (logger.isDebugEnabled()) { logger.debug("Node Type: '{}' should use Service Offering ID: '{}'", nodeTypeStr, serviceOfferingUuid); } ``` ########## plugins/integrations/kubernetes-service/src/main/java/com/cloud/kubernetes/cluster/KubernetesClusterManagerImpl.java: ########## @@ -1591,28 +1921,47 @@ public boolean scaleKubernetesCluster(ScaleKubernetesClusterCmd cmd) throws Clou logAndThrow(Level.ERROR, "Kubernetes Service plugin is disabled"); } validateKubernetesClusterScaleParameters(cmd); - KubernetesClusterVO kubernetesCluster = kubernetesClusterDao.findById(cmd.getId()); - final Long clusterSize = cmd.getClusterSize(); - if (clusterSize != null) { - CallContext.current().setEventDetails(String.format("Kubernetes cluster ID: %s scaling from size: %d to %d", - kubernetesCluster.getUuid(), kubernetesCluster.getNodeCount(), clusterSize)); - } + Map<String, ServiceOffering> nodeToOfferingMap = createNodeTypeToServiceOfferingMap(cmd.getServiceOfferingNodeTypeMap(), cmd.getServiceOfferingId(), kubernetesCluster); + String[] keys = getServiceUserKeys(kubernetesCluster); KubernetesClusterScaleWorker scaleWorker = new KubernetesClusterScaleWorker(kubernetesClusterDao.findById(cmd.getId()), - serviceOfferingDao.findById(cmd.getServiceOfferingId()), - clusterSize, - cmd.getNodeIds(), - cmd.isAutoscalingEnabled(), - cmd.getMinSize(), - cmd.getMaxSize(), - this); + nodeToOfferingMap, + cmd.getClusterSize(), + cmd.getNodeIds(), + cmd.isAutoscalingEnabled(), + cmd.getMinSize(), + cmd.getMaxSize(), + this); scaleWorker.setKeys(keys); scaleWorker = ComponentContext.inject(scaleWorker); return scaleWorker.scaleCluster(); } + /** + * Creates a map for the requested node type service offering + * For the node type ALL: Every node is scaled to the same offering + */ Review Comment: ```suggestion /** * Creates a map for the requested node type service offering * For the node type DEFAULT: Every node is scaled to the same offering */ ``` ########## plugins/integrations/kubernetes-service/src/main/java/com/cloud/kubernetes/cluster/actionworkers/KubernetesClusterScaleWorker.java: ########## @@ -446,37 +528,86 @@ public boolean scaleCluster() throws CloudRuntimeException { } scaleTimeoutTime = System.currentTimeMillis() + KubernetesClusterService.KubernetesClusterScaleTimeout.value() * 1000; final long originalClusterSize = kubernetesCluster.getNodeCount(); - final ServiceOffering existingServiceOffering = serviceOfferingDao.findById(kubernetesCluster.getServiceOfferingId()); - if (existingServiceOffering == null) { - logAndThrow(Level.ERROR, String.format("Scaling Kubernetes cluster %s failed, service offering for the Kubernetes cluster not found!", kubernetesCluster)); + if (serviceOfferingNodeTypeMap.containsKey(DEFAULT.name())) { + final ServiceOffering existingServiceOffering = serviceOfferingDao.findById(kubernetesCluster.getServiceOfferingId()); + if (existingServiceOffering == null) { + logAndThrow(Level.ERROR, String.format("Scaling Kubernetes cluster : %s failed, service offering for the Kubernetes cluster not found!", kubernetesCluster.getName())); + } Review Comment: In this current validation flow, if a k8s cluster only has a compute offering for each node type (`service_offering_id == null && control_service_offering_id != null && worker_service_offering_id != null`), then the error message will always be thrown and the scaling will not proceed. Therefore, we should check if the existing `service_offering_id` is `null` and if any k8s plane offering is `null`. ```suggestion if (serviceOfferingNodeTypeMap.containsKey(DEFAULT.name())) { final ServiceOffering existingServiceOffering = serviceOfferingDao.findById(kubernetesCluster.getServiceOfferingId()); final ServiceOffering existingControlOffering = serviceOfferingDao.findById(kubernetesCluster.getControlServiceOfferingId()); final ServiceOffering existingWorkerOffering = serviceOfferingDao.findById(kubernetesCluster.getWorkerServiceOfferingId()); if (existingServiceOffering == null && ObjectUtils.anyNull(existingControlOffering, existingWorkerOffering)) { logAndThrow(Level.ERROR, String.format("Scaling Kubernetes cluster : %s failed, service offering for the Kubernetes cluster not found!", kubernetesCluster.getName())); } ``` ########## plugins/integrations/kubernetes-service/src/main/java/com/cloud/kubernetes/cluster/actionworkers/KubernetesClusterScaleWorker.java: ########## @@ -446,37 +528,86 @@ public boolean scaleCluster() throws CloudRuntimeException { } scaleTimeoutTime = System.currentTimeMillis() + KubernetesClusterService.KubernetesClusterScaleTimeout.value() * 1000; final long originalClusterSize = kubernetesCluster.getNodeCount(); - final ServiceOffering existingServiceOffering = serviceOfferingDao.findById(kubernetesCluster.getServiceOfferingId()); - if (existingServiceOffering == null) { - logAndThrow(Level.ERROR, String.format("Scaling Kubernetes cluster %s failed, service offering for the Kubernetes cluster not found!", kubernetesCluster)); + if (serviceOfferingNodeTypeMap.containsKey(DEFAULT.name())) { + final ServiceOffering existingServiceOffering = serviceOfferingDao.findById(kubernetesCluster.getServiceOfferingId()); + if (existingServiceOffering == null) { + logAndThrow(Level.ERROR, String.format("Scaling Kubernetes cluster : %s failed, service offering for the Kubernetes cluster not found!", kubernetesCluster.getName())); + } } + final boolean autoscalingChanged = isAutoscalingChanged(); - final boolean serviceOfferingScalingNeeded = serviceOffering != null && serviceOffering.getId() != existingServiceOffering.getId(); + boolean hasDefaultOffering = serviceOfferingNodeTypeMap.containsKey(DEFAULT.name()); Review Comment: The `hasDefaultOffering` could be defined earlier in this flow. If so, then the `if` condition could be replaced by the variable. ########## plugins/integrations/kubernetes-service/src/main/java/com/cloud/kubernetes/cluster/KubernetesClusterManagerImpl.java: ########## @@ -1041,37 +1251,53 @@ private void validateKubernetesClusterScaleParameters(ScaleKubernetesClusterCmd } } - ServiceOffering serviceOffering = null; - if (serviceOfferingId != null) { - serviceOffering = serviceOfferingDao.findById(serviceOfferingId); - if (serviceOffering == null) { - throw new InvalidParameterValueException("Failed to find service offering ID: " + serviceOfferingId); - } else { - if (serviceOffering.isDynamic()) { - throw new InvalidParameterValueException(String.format("Custom service offerings are not supported for Kubernetes clusters. Kubernetes cluster : %s, service offering : %s", kubernetesCluster.getName(), serviceOffering.getName())); - } - if (serviceOffering.getCpu() < MIN_KUBERNETES_CLUSTER_NODE_CPU || serviceOffering.getRamSize() < MIN_KUBERNETES_CLUSTER_NODE_RAM_SIZE) { - throw new InvalidParameterValueException(String.format("Kubernetes cluster : %s cannot be scaled with service offering : %s, Kubernetes cluster template(CoreOS) needs minimum %d vCPUs and %d MB RAM", - kubernetesCluster.getName(), serviceOffering.getName(), MIN_KUBERNETES_CLUSTER_NODE_CPU, MIN_KUBERNETES_CLUSTER_NODE_RAM_SIZE)); - } - if (serviceOffering.getCpu() < clusterVersion.getMinimumCpu()) { - throw new InvalidParameterValueException(String.format("Kubernetes cluster : %s cannot be scaled with service offering : %s, associated Kubernetes version : %s needs minimum %d vCPUs", - kubernetesCluster.getName(), serviceOffering.getName(), clusterVersion.getName(), clusterVersion.getMinimumCpu())); + validateServiceOfferingsForNodeTypesScale(serviceOfferingNodeTypeMap, defaultServiceOfferingId, kubernetesCluster, clusterVersion); + + validateKubernetesClusterScaleSize(kubernetesCluster, clusterSize, maxClusterSize, zone); + } + + protected void validateServiceOfferingsForNodeTypesScale(Map<String, Long> map, Long defaultServiceOfferingId, KubernetesClusterVO kubernetesCluster, KubernetesSupportedVersion clusterVersion) { + for (String key : CLUSTER_NODES_TYPES_LIST) { + Long serviceOfferingId = map.getOrDefault(key, defaultServiceOfferingId); + if (serviceOfferingId != null) { + ServiceOffering serviceOffering = serviceOfferingDao.findById(serviceOfferingId); + if (serviceOffering == null) { + throw new InvalidParameterValueException("Failed to find service offering ID: " + serviceOfferingId); } - if (serviceOffering.getRamSize() < clusterVersion.getMinimumRamSize()) { - throw new InvalidParameterValueException(String.format("Kubernetes cluster : %s cannot be scaled with service offering : %s, associated Kubernetes version : %s needs minimum %d MB RAM", - kubernetesCluster.getName(), serviceOffering.getName(), clusterVersion.getName(), clusterVersion.getMinimumRamSize())); + checkServiceOfferingForNodesScale(serviceOffering, kubernetesCluster, clusterVersion); + final ServiceOffering existingServiceOffering = serviceOfferingDao.findById(kubernetesCluster.getServiceOfferingId()); + if (KubernetesCluster.State.Running.equals(kubernetesCluster.getState()) && (serviceOffering.getRamSize() < existingServiceOffering.getRamSize() || + serviceOffering.getCpu() * serviceOffering.getSpeed() < existingServiceOffering.getCpu() * existingServiceOffering.getSpeed())) { + logAndThrow(Level.WARN, String.format("Kubernetes cluster cannot be scaled down for service offering. Service offering : %s offers lesser resources as compared to service offering : %s of Kubernetes cluster : %s", + serviceOffering.getName(), existingServiceOffering.getName(), kubernetesCluster.getName())); Review Comment: Shouldn't the existing service offering be retrieved according to the k8s node type? ########## plugins/integrations/kubernetes-service/src/main/java/com/cloud/kubernetes/cluster/KubernetesClusterManagerImpl.java: ########## @@ -1591,28 +1921,47 @@ public boolean scaleKubernetesCluster(ScaleKubernetesClusterCmd cmd) throws Clou logAndThrow(Level.ERROR, "Kubernetes Service plugin is disabled"); } validateKubernetesClusterScaleParameters(cmd); - KubernetesClusterVO kubernetesCluster = kubernetesClusterDao.findById(cmd.getId()); - final Long clusterSize = cmd.getClusterSize(); - if (clusterSize != null) { - CallContext.current().setEventDetails(String.format("Kubernetes cluster ID: %s scaling from size: %d to %d", - kubernetesCluster.getUuid(), kubernetesCluster.getNodeCount(), clusterSize)); - } + Map<String, ServiceOffering> nodeToOfferingMap = createNodeTypeToServiceOfferingMap(cmd.getServiceOfferingNodeTypeMap(), cmd.getServiceOfferingId(), kubernetesCluster); + String[] keys = getServiceUserKeys(kubernetesCluster); KubernetesClusterScaleWorker scaleWorker = new KubernetesClusterScaleWorker(kubernetesClusterDao.findById(cmd.getId()), - serviceOfferingDao.findById(cmd.getServiceOfferingId()), - clusterSize, - cmd.getNodeIds(), - cmd.isAutoscalingEnabled(), - cmd.getMinSize(), - cmd.getMaxSize(), - this); + nodeToOfferingMap, + cmd.getClusterSize(), + cmd.getNodeIds(), + cmd.isAutoscalingEnabled(), + cmd.getMinSize(), + cmd.getMaxSize(), + this); Review Comment: The first parameter of the `KubernetesClusterScaleWorker` class is the `KubernetesClusterVO` with an ID equal to the `id` parameter specified in the API call. Since the `KubernetesClusterVO` with the given ID is already fetched from the database and referenced through the variable `kubernetesCluster` (line 1924), it is not necessary to fetch the VO again when setting the first parameter of the constructor; specifying the `kubernetesCluster` variable will suffice. ########## plugins/integrations/kubernetes-service/src/main/java/com/cloud/kubernetes/cluster/actionworkers/KubernetesClusterScaleWorker.java: ########## @@ -446,37 +528,86 @@ public boolean scaleCluster() throws CloudRuntimeException { } scaleTimeoutTime = System.currentTimeMillis() + KubernetesClusterService.KubernetesClusterScaleTimeout.value() * 1000; final long originalClusterSize = kubernetesCluster.getNodeCount(); - final ServiceOffering existingServiceOffering = serviceOfferingDao.findById(kubernetesCluster.getServiceOfferingId()); - if (existingServiceOffering == null) { - logAndThrow(Level.ERROR, String.format("Scaling Kubernetes cluster %s failed, service offering for the Kubernetes cluster not found!", kubernetesCluster)); + if (serviceOfferingNodeTypeMap.containsKey(DEFAULT.name())) { + final ServiceOffering existingServiceOffering = serviceOfferingDao.findById(kubernetesCluster.getServiceOfferingId()); + if (existingServiceOffering == null) { + logAndThrow(Level.ERROR, String.format("Scaling Kubernetes cluster : %s failed, service offering for the Kubernetes cluster not found!", kubernetesCluster.getName())); + } } + final boolean autoscalingChanged = isAutoscalingChanged(); - final boolean serviceOfferingScalingNeeded = serviceOffering != null && serviceOffering.getId() != existingServiceOffering.getId(); + boolean hasDefaultOffering = serviceOfferingNodeTypeMap.containsKey(DEFAULT.name()); + Long existingDefaultOfferingId = kubernetesCluster.getServiceOfferingId(); + ServiceOffering defaultServiceOffering = serviceOfferingNodeTypeMap.getOrDefault(DEFAULT.name(), null); + + for (KubernetesClusterNodeType nodeType : Arrays.asList(CONTROL, ETCD, WORKER)) { + boolean isWorkerNodeOrAllNodes = WORKER == nodeType; + final long newVMRequired = (!isWorkerNodeOrAllNodes || clusterSize == null) ? 0 : clusterSize - originalClusterSize; + if (!hasDefaultOffering && !serviceOfferingNodeTypeMap.containsKey(nodeType.name()) && newVMRequired == 0) { + continue; + } - if (autoscalingChanged) { - boolean autoScaled = autoscaleCluster(this.isAutoscalingEnabled, minSize, maxSize); - if (autoScaled && serviceOfferingScalingNeeded) { - scaleKubernetesClusterOffering(); + boolean serviceOfferingScalingNeeded = isServiceOfferingScalingNeededForNodeType(nodeType, serviceOfferingNodeTypeMap, kubernetesCluster, existingDefaultOfferingId); + ServiceOffering serviceOffering = serviceOfferingNodeTypeMap.getOrDefault(nodeType.name(), defaultServiceOffering); + boolean updateNodeOffering = serviceOfferingNodeTypeMap.containsKey(nodeType.name()); + boolean updateClusterOffering = isWorkerNodeOrAllNodes && hasDefaultOffering; + if (isWorkerNodeOrAllNodes && autoscalingChanged) { + boolean autoScaled = autoscaleCluster(this.isAutoscalingEnabled, minSize, maxSize); + if (autoScaled && serviceOfferingScalingNeeded) { + scaleKubernetesClusterOffering(nodeType, serviceOffering, updateNodeOffering, updateClusterOffering); + } + stateTransitTo(kubernetesCluster.getId(), KubernetesCluster.Event.OperationSucceeded); + return autoScaled; } - stateTransitTo(kubernetesCluster.getId(), KubernetesCluster.Event.OperationSucceeded); - return autoScaled; - } - final boolean clusterSizeScalingNeeded = clusterSize != null && clusterSize != originalClusterSize; - final long newVMRequired = clusterSize == null ? 0 : clusterSize - originalClusterSize; - if (serviceOfferingScalingNeeded && clusterSizeScalingNeeded) { - if (newVMRequired > 0) { - scaleKubernetesClusterOffering(); - scaleKubernetesClusterSize(); - } else { - scaleKubernetesClusterSize(); - scaleKubernetesClusterOffering(); + final boolean clusterSizeScalingNeeded = isWorkerNodeOrAllNodes && clusterSize != null && clusterSize != originalClusterSize; + if (serviceOfferingScalingNeeded && clusterSizeScalingNeeded) { + if (newVMRequired > 0) { + scaleKubernetesClusterOffering(nodeType, serviceOffering, updateNodeOffering, updateClusterOffering); + scaleKubernetesClusterSize(nodeType); + } else { + scaleKubernetesClusterSize(nodeType); + scaleKubernetesClusterOffering(nodeType, serviceOffering, updateNodeOffering, updateClusterOffering); + } + } else if (serviceOfferingScalingNeeded) { + scaleKubernetesClusterOffering(nodeType, serviceOffering, updateNodeOffering, updateClusterOffering); + } else if (clusterSizeScalingNeeded) { + scaleKubernetesClusterSize(nodeType); } - } else if (serviceOfferingScalingNeeded) { - scaleKubernetesClusterOffering(); - } else if (clusterSizeScalingNeeded) { - scaleKubernetesClusterSize(); } + stateTransitTo(kubernetesCluster.getId(), KubernetesCluster.Event.OperationSucceeded); return true; } + + protected boolean isServiceOfferingScalingNeededForNodeType(KubernetesClusterNodeType nodeType, + Map<String, ServiceOffering> map, KubernetesCluster kubernetesCluster, + Long existingDefaultOfferingId) { + Long existingOfferingId = map.containsKey(DEFAULT.name()) ? + existingDefaultOfferingId : + getExistingOfferingIdForNodeType(nodeType, kubernetesCluster); + if (existingOfferingId == null) { + logAndThrow(Level.ERROR, String.format("The Kubernetes cluster %s does not have a global service offering set", kubernetesCluster.getName())); + } + ServiceOffering existingOffering = serviceOfferingDao.findById(existingOfferingId); + if (existingOffering == null) { + logAndThrow(Level.ERROR, String.format("Cannot find the global service offering with ID %s set on the Kubernetes cluster %s", existingOfferingId, kubernetesCluster.getName())); + } + ServiceOffering newOffering = map.containsKey(DEFAULT.name()) ? map.get(DEFAULT.name()) : map.get(nodeType.name()); + return newOffering != null && newOffering.getId() != existingOffering.getId(); + } + + protected Long getExistingOfferingIdForNodeType(KubernetesClusterNodeType nodeType, KubernetesCluster kubernetesCluster) { + Long offeringId = null; + if (WORKER == nodeType) { + offeringId = kubernetesCluster.getWorkerServiceOfferingId(); + } else if (CONTROL == nodeType) { + offeringId = kubernetesCluster.getControlServiceOfferingId(); + } else if (ETCD == nodeType) { + offeringId = kubernetesCluster.getEtcdServiceOfferingId(); + } + if (offeringId == null) { + offeringId = kubernetesCluster.getServiceOfferingId(); + } + return offeringId; + } Review Comment: Imagine a scenario in which the k8s cluster is created specifying only the service offerings for each k8s plane. Later, if the user scales it, only changing the default service offering (only specifying the `serviceOfferingId` parameter), then the offerings of the k8s cluster VMs will be updated according to the offerings specified in the `scaleKubernetesCluster` API call. However, the `worker_service_offering_id` and `control_service_offering_id` attributes/columns of the cluster will still reference the old offerings that were set during the creation of the cluster. Therefore, in this scenarios, this method will return an inconsistent offering ID. A workaround for it would be to fetch in the DB the k8s cluster VMs of a given plane. Then, the method would return the ID of the service offering of the plane's VMs (`UserVMVO`). ```suggestion protected Long getExistingOfferingIdForNodeType(KubernetesClusterNodeType nodeType, KubernetesCluster kubernetesCluster) { List<KubernetesClusterVmMapVO> clusterVms = kubernetesClusterVmMapDao.listByClusterIdAndVmType(kubernetesCluster.getId(), nodeType); if (CollectionUtils.isEmpty(clusterVms)) { return null; } KubernetesClusterVmMapVO clusterVm = clusterVms.get(0); UserVmVO clusterUserVm = userVmDao.findById(clusterVm.getVmId()); if (clusterUserVm == null) { return null; } return clusterUserVm.getServiceOfferingId(); } ``` ########## plugins/integrations/kubernetes-service/src/main/java/org/apache/cloudstack/api/command/user/kubernetes/cluster/CreateKubernetesClusterCmd.java: ########## @@ -81,7 +111,23 @@ public class CreateKubernetesClusterCmd extends BaseAsyncCreateCmd { @ACL(accessType = AccessType.UseEntry) @Parameter(name = ApiConstants.SERVICE_OFFERING_ID, type = CommandType.UUID, entityType = ServiceOfferingResponse.class, description = "the ID of the service offering for the virtual machines in the cluster.") - private Long serviceOfferingId; + protected Long serviceOfferingId; + + @ACL(accessType = AccessType.UseEntry) + @Parameter(name = ApiConstants.NODE_TYPE_OFFERING_MAP, type = CommandType.MAP, + description = "(Optional) Node Type to Service Offering ID mapping. If provided, it overrides the serviceofferingid parameter") + protected Map<String, Map<String, String>> serviceOfferingNodeTypeMap; + + @ACL(accessType = AccessType.UseEntry) + @Parameter(name = ApiConstants.NODE_TYPE_TEMPLATE_MAP, type = CommandType.MAP, + description = "(Optional) Node Type to Template ID mapping. If provided, it overrides the default template: System VM template") + protected Map<String, Map<String, String>> templateNodeTypeMap; + + @ACL(accessType = AccessType.UseEntry) + @Parameter(name = ApiConstants.ETCD_NODES, type = CommandType.LONG, + description = "(Optional) Number of Kubernetes cluster etcd nodes, default is 0." + + "In case the number is greater than 0, etcd nodes are separate from master nodes and are provisioned accordingly") + protected Long etcdNodes; Review Comment: Is it necessary to define these attributes as `protected`? ########## plugins/integrations/kubernetes-service/src/main/java/com/cloud/kubernetes/cluster/actionworkers/KubernetesClusterScaleWorker.java: ########## @@ -446,37 +528,86 @@ public boolean scaleCluster() throws CloudRuntimeException { } scaleTimeoutTime = System.currentTimeMillis() + KubernetesClusterService.KubernetesClusterScaleTimeout.value() * 1000; final long originalClusterSize = kubernetesCluster.getNodeCount(); - final ServiceOffering existingServiceOffering = serviceOfferingDao.findById(kubernetesCluster.getServiceOfferingId()); - if (existingServiceOffering == null) { - logAndThrow(Level.ERROR, String.format("Scaling Kubernetes cluster %s failed, service offering for the Kubernetes cluster not found!", kubernetesCluster)); + if (serviceOfferingNodeTypeMap.containsKey(DEFAULT.name())) { + final ServiceOffering existingServiceOffering = serviceOfferingDao.findById(kubernetesCluster.getServiceOfferingId()); + if (existingServiceOffering == null) { + logAndThrow(Level.ERROR, String.format("Scaling Kubernetes cluster : %s failed, service offering for the Kubernetes cluster not found!", kubernetesCluster.getName())); + } } + final boolean autoscalingChanged = isAutoscalingChanged(); - final boolean serviceOfferingScalingNeeded = serviceOffering != null && serviceOffering.getId() != existingServiceOffering.getId(); + boolean hasDefaultOffering = serviceOfferingNodeTypeMap.containsKey(DEFAULT.name()); + Long existingDefaultOfferingId = kubernetesCluster.getServiceOfferingId(); + ServiceOffering defaultServiceOffering = serviceOfferingNodeTypeMap.getOrDefault(DEFAULT.name(), null); + + for (KubernetesClusterNodeType nodeType : Arrays.asList(CONTROL, ETCD, WORKER)) { + boolean isWorkerNodeOrAllNodes = WORKER == nodeType; + final long newVMRequired = (!isWorkerNodeOrAllNodes || clusterSize == null) ? 0 : clusterSize - originalClusterSize; + if (!hasDefaultOffering && !serviceOfferingNodeTypeMap.containsKey(nodeType.name()) && newVMRequired == 0) { + continue; + } - if (autoscalingChanged) { - boolean autoScaled = autoscaleCluster(this.isAutoscalingEnabled, minSize, maxSize); - if (autoScaled && serviceOfferingScalingNeeded) { - scaleKubernetesClusterOffering(); + boolean serviceOfferingScalingNeeded = isServiceOfferingScalingNeededForNodeType(nodeType, serviceOfferingNodeTypeMap, kubernetesCluster, existingDefaultOfferingId); + ServiceOffering serviceOffering = serviceOfferingNodeTypeMap.getOrDefault(nodeType.name(), defaultServiceOffering); + boolean updateNodeOffering = serviceOfferingNodeTypeMap.containsKey(nodeType.name()); + boolean updateClusterOffering = isWorkerNodeOrAllNodes && hasDefaultOffering; + if (isWorkerNodeOrAllNodes && autoscalingChanged) { + boolean autoScaled = autoscaleCluster(this.isAutoscalingEnabled, minSize, maxSize); + if (autoScaled && serviceOfferingScalingNeeded) { + scaleKubernetesClusterOffering(nodeType, serviceOffering, updateNodeOffering, updateClusterOffering); + } + stateTransitTo(kubernetesCluster.getId(), KubernetesCluster.Event.OperationSucceeded); + return autoScaled; } - stateTransitTo(kubernetesCluster.getId(), KubernetesCluster.Event.OperationSucceeded); - return autoScaled; - } - final boolean clusterSizeScalingNeeded = clusterSize != null && clusterSize != originalClusterSize; - final long newVMRequired = clusterSize == null ? 0 : clusterSize - originalClusterSize; - if (serviceOfferingScalingNeeded && clusterSizeScalingNeeded) { - if (newVMRequired > 0) { - scaleKubernetesClusterOffering(); - scaleKubernetesClusterSize(); - } else { - scaleKubernetesClusterSize(); - scaleKubernetesClusterOffering(); + final boolean clusterSizeScalingNeeded = isWorkerNodeOrAllNodes && clusterSize != null && clusterSize != originalClusterSize; + if (serviceOfferingScalingNeeded && clusterSizeScalingNeeded) { + if (newVMRequired > 0) { + scaleKubernetesClusterOffering(nodeType, serviceOffering, updateNodeOffering, updateClusterOffering); + scaleKubernetesClusterSize(nodeType); + } else { + scaleKubernetesClusterSize(nodeType); + scaleKubernetesClusterOffering(nodeType, serviceOffering, updateNodeOffering, updateClusterOffering); + } + } else if (serviceOfferingScalingNeeded) { + scaleKubernetesClusterOffering(nodeType, serviceOffering, updateNodeOffering, updateClusterOffering); + } else if (clusterSizeScalingNeeded) { + scaleKubernetesClusterSize(nodeType); } - } else if (serviceOfferingScalingNeeded) { - scaleKubernetesClusterOffering(); - } else if (clusterSizeScalingNeeded) { - scaleKubernetesClusterSize(); } + stateTransitTo(kubernetesCluster.getId(), KubernetesCluster.Event.OperationSucceeded); return true; } + + protected boolean isServiceOfferingScalingNeededForNodeType(KubernetesClusterNodeType nodeType, + Map<String, ServiceOffering> map, KubernetesCluster kubernetesCluster, + Long existingDefaultOfferingId) { + Long existingOfferingId = map.containsKey(DEFAULT.name()) ? + existingDefaultOfferingId : + getExistingOfferingIdForNodeType(nodeType, kubernetesCluster); Review Comment: Similar to the suggestion on the `getExistingOfferingIdForNodeType` method, imagine that the k8s cluster only has a `control_service_offering_id` and `worker_service_offering_id` defined. Then, the parameter `existingDefaultOfferingId` would be `null`. If the user tries to scale the k8s cluster, only specifying the default service offering, then the `existingOfferingId` method variable would be `null`, leading to an error message. Therefore, it's only necessary to execute the `getExistingOfferingIdForNodeType` method in order to retrieve the existing offering ID of a `nodeType`; the `existingDefaultOfferingId` parameter could be removed as well. ```suggestion Long existingOfferingId = getExistingOfferingIdForNodeType(nodeType, kubernetesCluster); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudstack.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org