bernardodemarco commented on code in PR #9102:
URL: https://github.com/apache/cloudstack/pull/9102#discussion_r1942940914


##########
plugins/integrations/kubernetes-service/src/main/java/com/cloud/kubernetes/cluster/actionworkers/KubernetesClusterResourceModifierActionWorker.java:
##########
@@ -293,13 +259,35 @@ protected DeployDestination plan(final long nodesCount, 
final DataCenter zone, f
         throw new InsufficientServerCapacityException(msg, DataCenter.class, 
zone.getId());
     }
 
-    protected DeployDestination plan() throws 
InsufficientServerCapacityException {
-        ServiceOffering offering = 
serviceOfferingDao.findById(kubernetesCluster.getServiceOfferingId());
+    /**
+     * Plan Kubernetes Cluster Deployment
+     * @return a map of DeployDestination per node type
+     */
+    protected Map<String, DeployDestination> planKubernetesCluster(Long 
domainId, Long accountId, Hypervisor.HypervisorType hypervisorType) throws 
InsufficientServerCapacityException {
+        Map<String, DeployDestination> destinationMap = new HashMap<>();
         DataCenter zone = 
dataCenterDao.findById(kubernetesCluster.getZoneId());
         if (logger.isDebugEnabled()) {
             logger.debug("Checking deployment destination for Kubernetes 
cluster: {} in zone: {}", kubernetesCluster, zone);
         }
-        return plan(kubernetesCluster.getTotalNodeCount(), zone, offering);
+        long controlNodeCount = kubernetesCluster.getControlNodeCount();
+        long clusterSize = kubernetesCluster.getNodeCount();
+        long etcdNodes = kubernetesCluster.getEtcdNodeCount();
+        Map<String, Long> nodeTypeCount = Map.of(WORKER.name(), clusterSize,
+                CONTROL.name(), controlNodeCount, ETCD.name(), etcdNodes);
+
+        for (KubernetesClusterNodeType nodeType : CLUSTER_NODES_TYPES_LIST) {
+            Long nodes = nodeTypeCount.getOrDefault(nodeType.name(), 
kubernetesCluster.getServiceOfferingId());
+            if (nodes == null || nodes == 0) {
+                continue;
+            }
+            ServiceOffering nodeOffering = 
getServiceOfferingForNodeTypeOnCluster(nodeType, kubernetesCluster);
+            if (logger.isDebugEnabled()) {
+                logger.debug(String.format("Checking deployment destination 
for %s nodes on Kubernetes cluster : %s in zone : %s", nodeType.name(), 
kubernetesCluster.getName(), zone.getName()));
+            }

Review Comment:
   ```suggestion
               if (logger.isDebugEnabled()) {
                   logger.debug("Checking deployment destination for {} nodes 
on Kubernetes cluster : {} in zone : {}", nodeType.name(), 
kubernetesCluster.getName(), zone.getName());
               }
   ```



##########
plugins/integrations/kubernetes-service/src/main/java/com/cloud/kubernetes/cluster/KubernetesServiceHelperImpl.java:
##########
@@ -106,6 +120,127 @@ public void checkVmCanBeDestroyed(UserVm userVm) {
         throw new CloudRuntimeException(msg);
     }
 
+    @Override
+    public boolean isValidNodeType(String nodeType) {
+        if (StringUtils.isBlank(nodeType)) {
+            return false;
+        }
+        try {
+            KubernetesClusterNodeType.valueOf(nodeType.toUpperCase());
+            return true;
+        } catch (IllegalArgumentException e) {
+            return false;
+        }
+    }
+
+    @Override
+    public Map<String, Long> getServiceOfferingNodeTypeMap(Map<String, 
Map<String, String>> serviceOfferingNodeTypeMap) {
+        Map<String, Long> mapping = new HashMap<>();
+        if (MapUtils.isNotEmpty(serviceOfferingNodeTypeMap)) {
+            for (Map<String, String> entry : 
serviceOfferingNodeTypeMap.values()) {
+                processNodeTypeOfferingEntryAndAddToMappingIfValid(entry, 
mapping);
+            }
+        }
+        return mapping;
+    }
+
+    protected void checkNodeTypeOfferingEntryCompleteness(String nodeTypeStr, 
String serviceOfferingUuid) {
+        if (StringUtils.isAnyEmpty(nodeTypeStr, serviceOfferingUuid)) {
+            String error = String.format("Incomplete Node Type to Service 
Offering ID mapping: '%s' -> '%s'", nodeTypeStr, serviceOfferingUuid);
+            logger.error(error);
+            throw new InvalidParameterValueException(error);
+        }
+    }
+
+    protected void checkNodeTypeOfferingEntryValues(String nodeTypeStr, 
ServiceOffering serviceOffering, String serviceOfferingUuid) {
+        if (!isValidNodeType(nodeTypeStr)) {
+            String error = String.format("The provided value '%s' for Node 
Type is invalid", nodeTypeStr);
+            logger.error(error);
+            throw new InvalidParameterValueException(String.format(error));
+        }
+        if (serviceOffering == null) {
+            String error = String.format("Cannot find a service offering with 
ID %s", serviceOfferingUuid);
+            logger.error(error);
+            throw new InvalidParameterValueException(error);
+        }
+    }
+
+    protected void addNodeTypeOfferingEntry(String nodeTypeStr, String 
serviceOfferingUuid, ServiceOffering serviceOffering, Map<String, Long> 
mapping) {
+        if (logger.isDebugEnabled()) {
+            logger.debug(String.format("Node Type: '%s' should use Service 
Offering ID: '%s'", nodeTypeStr, serviceOfferingUuid));
+        }

Review Comment:
   ```suggestion
           if (logger.isDebugEnabled()) {
               logger.debug("Node Type: '{}' should use Service Offering ID: 
'{}'", nodeTypeStr, serviceOfferingUuid);
           }
   ```



##########
plugins/integrations/kubernetes-service/src/main/java/com/cloud/kubernetes/cluster/KubernetesClusterManagerImpl.java:
##########
@@ -1591,28 +1921,47 @@ public boolean 
scaleKubernetesCluster(ScaleKubernetesClusterCmd cmd) throws Clou
             logAndThrow(Level.ERROR, "Kubernetes Service plugin is disabled");
         }
         validateKubernetesClusterScaleParameters(cmd);
-
         KubernetesClusterVO kubernetesCluster = 
kubernetesClusterDao.findById(cmd.getId());
-        final Long clusterSize = cmd.getClusterSize();
-        if (clusterSize != null) {
-            CallContext.current().setEventDetails(String.format("Kubernetes 
cluster ID: %s scaling from size: %d to %d",
-                    kubernetesCluster.getUuid(), 
kubernetesCluster.getNodeCount(), clusterSize));
-        }
+        Map<String, ServiceOffering> nodeToOfferingMap = 
createNodeTypeToServiceOfferingMap(cmd.getServiceOfferingNodeTypeMap(), 
cmd.getServiceOfferingId(), kubernetesCluster);
+
         String[] keys = getServiceUserKeys(kubernetesCluster);
         KubernetesClusterScaleWorker scaleWorker =
             new 
KubernetesClusterScaleWorker(kubernetesClusterDao.findById(cmd.getId()),
-                    serviceOfferingDao.findById(cmd.getServiceOfferingId()),
-                    clusterSize,
-                    cmd.getNodeIds(),
-                    cmd.isAutoscalingEnabled(),
-                    cmd.getMinSize(),
-                    cmd.getMaxSize(),
-                    this);
+                nodeToOfferingMap,
+                cmd.getClusterSize(),
+                cmd.getNodeIds(),
+                cmd.isAutoscalingEnabled(),
+                cmd.getMinSize(),
+                cmd.getMaxSize(),
+                this);
         scaleWorker.setKeys(keys);
         scaleWorker = ComponentContext.inject(scaleWorker);
         return scaleWorker.scaleCluster();
     }
 
+    /**
+     * Creates a map for the requested node type service offering
+     * For the node type ALL: Every node is scaled to the same offering
+     */

Review Comment:
   ```suggestion
       /**
        * Creates a map for the requested node type service offering
        * For the node type DEFAULT: Every node is scaled to the same offering
        */
   ```



##########
plugins/integrations/kubernetes-service/src/main/java/com/cloud/kubernetes/cluster/actionworkers/KubernetesClusterScaleWorker.java:
##########
@@ -446,37 +528,86 @@ public boolean scaleCluster() throws 
CloudRuntimeException {
         }
         scaleTimeoutTime = System.currentTimeMillis() + 
KubernetesClusterService.KubernetesClusterScaleTimeout.value() * 1000;
         final long originalClusterSize = kubernetesCluster.getNodeCount();
-        final ServiceOffering existingServiceOffering = 
serviceOfferingDao.findById(kubernetesCluster.getServiceOfferingId());
-        if (existingServiceOffering == null) {
-            logAndThrow(Level.ERROR, String.format("Scaling Kubernetes cluster 
%s failed, service offering for the Kubernetes cluster not found!", 
kubernetesCluster));
+        if (serviceOfferingNodeTypeMap.containsKey(DEFAULT.name())) {
+            final ServiceOffering existingServiceOffering = 
serviceOfferingDao.findById(kubernetesCluster.getServiceOfferingId());
+            if (existingServiceOffering == null) {
+                logAndThrow(Level.ERROR, String.format("Scaling Kubernetes 
cluster : %s failed, service offering for the Kubernetes cluster not found!", 
kubernetesCluster.getName()));
+            }

Review Comment:
   In this current validation flow, if a k8s cluster only has a compute 
offering for each node type (`service_offering_id == null && 
control_service_offering_id != null && worker_service_offering_id != null`), 
then the error message will always be thrown and the scaling will not proceed.
   
   Therefore, we should check if the existing `service_offering_id` is `null` 
and if any k8s plane offering is `null`.
   ```suggestion
           if (serviceOfferingNodeTypeMap.containsKey(DEFAULT.name())) {
               final ServiceOffering existingServiceOffering = 
serviceOfferingDao.findById(kubernetesCluster.getServiceOfferingId());
               final ServiceOffering existingControlOffering = 
serviceOfferingDao.findById(kubernetesCluster.getControlServiceOfferingId());
               final ServiceOffering existingWorkerOffering = 
serviceOfferingDao.findById(kubernetesCluster.getWorkerServiceOfferingId());
               if (existingServiceOffering == null && 
ObjectUtils.anyNull(existingControlOffering, existingWorkerOffering)) {
                   logAndThrow(Level.ERROR, String.format("Scaling Kubernetes 
cluster : %s failed, service offering for the Kubernetes cluster not found!", 
kubernetesCluster.getName()));
               }
   ```



##########
plugins/integrations/kubernetes-service/src/main/java/com/cloud/kubernetes/cluster/actionworkers/KubernetesClusterScaleWorker.java:
##########
@@ -446,37 +528,86 @@ public boolean scaleCluster() throws 
CloudRuntimeException {
         }
         scaleTimeoutTime = System.currentTimeMillis() + 
KubernetesClusterService.KubernetesClusterScaleTimeout.value() * 1000;
         final long originalClusterSize = kubernetesCluster.getNodeCount();
-        final ServiceOffering existingServiceOffering = 
serviceOfferingDao.findById(kubernetesCluster.getServiceOfferingId());
-        if (existingServiceOffering == null) {
-            logAndThrow(Level.ERROR, String.format("Scaling Kubernetes cluster 
%s failed, service offering for the Kubernetes cluster not found!", 
kubernetesCluster));
+        if (serviceOfferingNodeTypeMap.containsKey(DEFAULT.name())) {
+            final ServiceOffering existingServiceOffering = 
serviceOfferingDao.findById(kubernetesCluster.getServiceOfferingId());
+            if (existingServiceOffering == null) {
+                logAndThrow(Level.ERROR, String.format("Scaling Kubernetes 
cluster : %s failed, service offering for the Kubernetes cluster not found!", 
kubernetesCluster.getName()));
+            }
         }
+
         final boolean autoscalingChanged = isAutoscalingChanged();
-        final boolean serviceOfferingScalingNeeded = serviceOffering != null 
&& serviceOffering.getId() != existingServiceOffering.getId();
+        boolean hasDefaultOffering = 
serviceOfferingNodeTypeMap.containsKey(DEFAULT.name());

Review Comment:
   The `hasDefaultOffering` could be defined earlier in this flow. If so, then 
the `if` condition could be replaced by the variable.



##########
plugins/integrations/kubernetes-service/src/main/java/com/cloud/kubernetes/cluster/KubernetesClusterManagerImpl.java:
##########
@@ -1041,37 +1251,53 @@ private void 
validateKubernetesClusterScaleParameters(ScaleKubernetesClusterCmd
             }
         }
 
-        ServiceOffering serviceOffering = null;
-        if (serviceOfferingId != null) {
-            serviceOffering = serviceOfferingDao.findById(serviceOfferingId);
-            if (serviceOffering == null) {
-                throw new InvalidParameterValueException("Failed to find 
service offering ID: " + serviceOfferingId);
-            } else {
-                if (serviceOffering.isDynamic()) {
-                    throw new 
InvalidParameterValueException(String.format("Custom service offerings are not 
supported for Kubernetes clusters. Kubernetes cluster : %s, service offering : 
%s", kubernetesCluster.getName(), serviceOffering.getName()));
-                }
-                if (serviceOffering.getCpu() < MIN_KUBERNETES_CLUSTER_NODE_CPU 
|| serviceOffering.getRamSize() < MIN_KUBERNETES_CLUSTER_NODE_RAM_SIZE) {
-                    throw new 
InvalidParameterValueException(String.format("Kubernetes cluster : %s cannot be 
scaled with service offering : %s, Kubernetes cluster template(CoreOS) needs 
minimum %d vCPUs and %d MB RAM",
-                            kubernetesCluster.getName(), 
serviceOffering.getName(), MIN_KUBERNETES_CLUSTER_NODE_CPU, 
MIN_KUBERNETES_CLUSTER_NODE_RAM_SIZE));
-                }
-                if (serviceOffering.getCpu() < clusterVersion.getMinimumCpu()) 
{
-                    throw new 
InvalidParameterValueException(String.format("Kubernetes cluster : %s cannot be 
scaled with service offering : %s, associated Kubernetes version : %s needs 
minimum %d vCPUs",
-                            kubernetesCluster.getName(), 
serviceOffering.getName(), clusterVersion.getName(), 
clusterVersion.getMinimumCpu()));
+        validateServiceOfferingsForNodeTypesScale(serviceOfferingNodeTypeMap, 
defaultServiceOfferingId, kubernetesCluster, clusterVersion);
+
+        validateKubernetesClusterScaleSize(kubernetesCluster, clusterSize, 
maxClusterSize, zone);
+    }
+
+    protected void validateServiceOfferingsForNodeTypesScale(Map<String, Long> 
map, Long defaultServiceOfferingId, KubernetesClusterVO kubernetesCluster, 
KubernetesSupportedVersion clusterVersion) {
+        for (String key : CLUSTER_NODES_TYPES_LIST) {
+            Long serviceOfferingId = map.getOrDefault(key, 
defaultServiceOfferingId);
+            if (serviceOfferingId != null) {
+                ServiceOffering serviceOffering = 
serviceOfferingDao.findById(serviceOfferingId);
+                if (serviceOffering == null) {
+                    throw new InvalidParameterValueException("Failed to find 
service offering ID: " + serviceOfferingId);
                 }
-                if (serviceOffering.getRamSize() < 
clusterVersion.getMinimumRamSize()) {
-                    throw new 
InvalidParameterValueException(String.format("Kubernetes cluster : %s cannot be 
scaled with service offering : %s, associated Kubernetes version : %s needs 
minimum %d MB RAM",
-                            kubernetesCluster.getName(), 
serviceOffering.getName(), clusterVersion.getName(), 
clusterVersion.getMinimumRamSize()));
+                checkServiceOfferingForNodesScale(serviceOffering, 
kubernetesCluster, clusterVersion);
+                final ServiceOffering existingServiceOffering = 
serviceOfferingDao.findById(kubernetesCluster.getServiceOfferingId());
+                if 
(KubernetesCluster.State.Running.equals(kubernetesCluster.getState()) && 
(serviceOffering.getRamSize() < existingServiceOffering.getRamSize() ||
+                        serviceOffering.getCpu() * serviceOffering.getSpeed() 
< existingServiceOffering.getCpu() * existingServiceOffering.getSpeed())) {
+                    logAndThrow(Level.WARN, String.format("Kubernetes cluster 
cannot be scaled down for service offering. Service offering : %s offers lesser 
resources as compared to service offering : %s of Kubernetes cluster : %s",
+                            serviceOffering.getName(), 
existingServiceOffering.getName(), kubernetesCluster.getName()));

Review Comment:
   Shouldn't the existing service offering be retrieved according to the k8s 
node type?



##########
plugins/integrations/kubernetes-service/src/main/java/com/cloud/kubernetes/cluster/KubernetesClusterManagerImpl.java:
##########
@@ -1591,28 +1921,47 @@ public boolean 
scaleKubernetesCluster(ScaleKubernetesClusterCmd cmd) throws Clou
             logAndThrow(Level.ERROR, "Kubernetes Service plugin is disabled");
         }
         validateKubernetesClusterScaleParameters(cmd);
-
         KubernetesClusterVO kubernetesCluster = 
kubernetesClusterDao.findById(cmd.getId());
-        final Long clusterSize = cmd.getClusterSize();
-        if (clusterSize != null) {
-            CallContext.current().setEventDetails(String.format("Kubernetes 
cluster ID: %s scaling from size: %d to %d",
-                    kubernetesCluster.getUuid(), 
kubernetesCluster.getNodeCount(), clusterSize));
-        }
+        Map<String, ServiceOffering> nodeToOfferingMap = 
createNodeTypeToServiceOfferingMap(cmd.getServiceOfferingNodeTypeMap(), 
cmd.getServiceOfferingId(), kubernetesCluster);
+
         String[] keys = getServiceUserKeys(kubernetesCluster);
         KubernetesClusterScaleWorker scaleWorker =
             new 
KubernetesClusterScaleWorker(kubernetesClusterDao.findById(cmd.getId()),
-                    serviceOfferingDao.findById(cmd.getServiceOfferingId()),
-                    clusterSize,
-                    cmd.getNodeIds(),
-                    cmd.isAutoscalingEnabled(),
-                    cmd.getMinSize(),
-                    cmd.getMaxSize(),
-                    this);
+                nodeToOfferingMap,
+                cmd.getClusterSize(),
+                cmd.getNodeIds(),
+                cmd.isAutoscalingEnabled(),
+                cmd.getMinSize(),
+                cmd.getMaxSize(),
+                this);

Review Comment:
   The first parameter of the `KubernetesClusterScaleWorker` class is the 
`KubernetesClusterVO` with an ID equal to the `id` parameter specified in the 
API call.
   
   Since the `KubernetesClusterVO` with the given ID is already fetched from 
the database and referenced through the variable `kubernetesCluster` (line 
1924), it is not necessary to fetch the VO again when setting the first 
parameter of the constructor; specifying the `kubernetesCluster` variable will 
suffice.



##########
plugins/integrations/kubernetes-service/src/main/java/com/cloud/kubernetes/cluster/actionworkers/KubernetesClusterScaleWorker.java:
##########
@@ -446,37 +528,86 @@ public boolean scaleCluster() throws 
CloudRuntimeException {
         }
         scaleTimeoutTime = System.currentTimeMillis() + 
KubernetesClusterService.KubernetesClusterScaleTimeout.value() * 1000;
         final long originalClusterSize = kubernetesCluster.getNodeCount();
-        final ServiceOffering existingServiceOffering = 
serviceOfferingDao.findById(kubernetesCluster.getServiceOfferingId());
-        if (existingServiceOffering == null) {
-            logAndThrow(Level.ERROR, String.format("Scaling Kubernetes cluster 
%s failed, service offering for the Kubernetes cluster not found!", 
kubernetesCluster));
+        if (serviceOfferingNodeTypeMap.containsKey(DEFAULT.name())) {
+            final ServiceOffering existingServiceOffering = 
serviceOfferingDao.findById(kubernetesCluster.getServiceOfferingId());
+            if (existingServiceOffering == null) {
+                logAndThrow(Level.ERROR, String.format("Scaling Kubernetes 
cluster : %s failed, service offering for the Kubernetes cluster not found!", 
kubernetesCluster.getName()));
+            }
         }
+
         final boolean autoscalingChanged = isAutoscalingChanged();
-        final boolean serviceOfferingScalingNeeded = serviceOffering != null 
&& serviceOffering.getId() != existingServiceOffering.getId();
+        boolean hasDefaultOffering = 
serviceOfferingNodeTypeMap.containsKey(DEFAULT.name());
+        Long existingDefaultOfferingId = 
kubernetesCluster.getServiceOfferingId();
+        ServiceOffering defaultServiceOffering = 
serviceOfferingNodeTypeMap.getOrDefault(DEFAULT.name(), null);
+
+        for (KubernetesClusterNodeType nodeType : Arrays.asList(CONTROL, ETCD, 
WORKER)) {
+            boolean isWorkerNodeOrAllNodes = WORKER == nodeType;
+            final long newVMRequired = (!isWorkerNodeOrAllNodes || clusterSize 
== null) ? 0 : clusterSize - originalClusterSize;
+            if (!hasDefaultOffering && 
!serviceOfferingNodeTypeMap.containsKey(nodeType.name()) && newVMRequired == 0) 
{
+                continue;
+            }
 
-        if (autoscalingChanged) {
-            boolean autoScaled = autoscaleCluster(this.isAutoscalingEnabled, 
minSize, maxSize);
-            if (autoScaled && serviceOfferingScalingNeeded) {
-                scaleKubernetesClusterOffering();
+            boolean serviceOfferingScalingNeeded = 
isServiceOfferingScalingNeededForNodeType(nodeType, serviceOfferingNodeTypeMap, 
kubernetesCluster, existingDefaultOfferingId);
+            ServiceOffering serviceOffering = 
serviceOfferingNodeTypeMap.getOrDefault(nodeType.name(), 
defaultServiceOffering);
+            boolean updateNodeOffering = 
serviceOfferingNodeTypeMap.containsKey(nodeType.name());
+            boolean updateClusterOffering = isWorkerNodeOrAllNodes && 
hasDefaultOffering;
+            if (isWorkerNodeOrAllNodes && autoscalingChanged) {
+                boolean autoScaled = 
autoscaleCluster(this.isAutoscalingEnabled, minSize, maxSize);
+                if (autoScaled && serviceOfferingScalingNeeded) {
+                    scaleKubernetesClusterOffering(nodeType, serviceOffering, 
updateNodeOffering, updateClusterOffering);
+                }
+                stateTransitTo(kubernetesCluster.getId(), 
KubernetesCluster.Event.OperationSucceeded);
+                return autoScaled;
             }
-            stateTransitTo(kubernetesCluster.getId(), 
KubernetesCluster.Event.OperationSucceeded);
-            return autoScaled;
-        }
-        final boolean clusterSizeScalingNeeded = clusterSize != null && 
clusterSize != originalClusterSize;
-        final long newVMRequired = clusterSize == null ? 0 : clusterSize - 
originalClusterSize;
-        if (serviceOfferingScalingNeeded && clusterSizeScalingNeeded) {
-            if (newVMRequired > 0) {
-                scaleKubernetesClusterOffering();
-                scaleKubernetesClusterSize();
-            } else {
-                scaleKubernetesClusterSize();
-                scaleKubernetesClusterOffering();
+            final boolean clusterSizeScalingNeeded = isWorkerNodeOrAllNodes && 
clusterSize != null && clusterSize != originalClusterSize;
+            if (serviceOfferingScalingNeeded && clusterSizeScalingNeeded) {
+                if (newVMRequired > 0) {
+                    scaleKubernetesClusterOffering(nodeType, serviceOffering, 
updateNodeOffering, updateClusterOffering);
+                    scaleKubernetesClusterSize(nodeType);
+                } else {
+                    scaleKubernetesClusterSize(nodeType);
+                    scaleKubernetesClusterOffering(nodeType, serviceOffering, 
updateNodeOffering, updateClusterOffering);
+                }
+            } else if (serviceOfferingScalingNeeded) {
+                scaleKubernetesClusterOffering(nodeType, serviceOffering, 
updateNodeOffering, updateClusterOffering);
+            } else if (clusterSizeScalingNeeded) {
+                scaleKubernetesClusterSize(nodeType);
             }
-        } else if (serviceOfferingScalingNeeded) {
-            scaleKubernetesClusterOffering();
-        } else if (clusterSizeScalingNeeded) {
-            scaleKubernetesClusterSize();
         }
+
         stateTransitTo(kubernetesCluster.getId(), 
KubernetesCluster.Event.OperationSucceeded);
         return true;
     }
+
+    protected boolean 
isServiceOfferingScalingNeededForNodeType(KubernetesClusterNodeType nodeType,
+                                                                Map<String, 
ServiceOffering> map, KubernetesCluster kubernetesCluster,
+                                                                Long 
existingDefaultOfferingId) {
+        Long existingOfferingId = map.containsKey(DEFAULT.name()) ?
+                existingDefaultOfferingId :
+                getExistingOfferingIdForNodeType(nodeType, kubernetesCluster);
+        if (existingOfferingId == null) {
+            logAndThrow(Level.ERROR, String.format("The Kubernetes cluster %s 
does not have a global service offering set", kubernetesCluster.getName()));
+        }
+        ServiceOffering existingOffering = 
serviceOfferingDao.findById(existingOfferingId);
+        if (existingOffering == null) {
+            logAndThrow(Level.ERROR, String.format("Cannot find the global 
service offering with ID %s set on the Kubernetes cluster %s", 
existingOfferingId, kubernetesCluster.getName()));
+        }
+        ServiceOffering newOffering = map.containsKey(DEFAULT.name()) ? 
map.get(DEFAULT.name()) : map.get(nodeType.name());
+        return newOffering != null && newOffering.getId() != 
existingOffering.getId();
+    }
+
+    protected Long getExistingOfferingIdForNodeType(KubernetesClusterNodeType 
nodeType, KubernetesCluster kubernetesCluster) {
+        Long offeringId = null;
+        if (WORKER == nodeType) {
+            offeringId = kubernetesCluster.getWorkerServiceOfferingId();
+        } else if (CONTROL == nodeType) {
+            offeringId = kubernetesCluster.getControlServiceOfferingId();
+        } else if (ETCD == nodeType) {
+            offeringId = kubernetesCluster.getEtcdServiceOfferingId();
+        }
+        if (offeringId == null) {
+            offeringId = kubernetesCluster.getServiceOfferingId();
+        }
+        return offeringId;
+    }

Review Comment:
   Imagine a scenario in which the k8s cluster is created specifying only the 
service offerings for each k8s plane. Later, if the user scales it, only 
changing the default service offering (only specifying the `serviceOfferingId` 
parameter), then the offerings of the k8s cluster VMs will be updated according 
to the offerings specified in the `scaleKubernetesCluster` API call.
   
   However, the `worker_service_offering_id` and `control_service_offering_id` 
attributes/columns of the cluster will still reference the old offerings that 
were set during the creation of the cluster. Therefore, in this scenarios, this 
method will return an inconsistent offering ID.
   
   A workaround for it would be to fetch in the DB the k8s cluster VMs of a 
given plane. Then, the method would return the ID of the service offering of 
the plane's VMs (`UserVMVO`).
   ```suggestion
       protected Long 
getExistingOfferingIdForNodeType(KubernetesClusterNodeType nodeType, 
KubernetesCluster kubernetesCluster) {
           List<KubernetesClusterVmMapVO> clusterVms = 
kubernetesClusterVmMapDao.listByClusterIdAndVmType(kubernetesCluster.getId(), 
nodeType);
           if (CollectionUtils.isEmpty(clusterVms)) {
               return null;
           }
   
           KubernetesClusterVmMapVO clusterVm = clusterVms.get(0);
           UserVmVO clusterUserVm = userVmDao.findById(clusterVm.getVmId());
           if (clusterUserVm == null) {
               return null;
           }
   
           return clusterUserVm.getServiceOfferingId();
       }
   ```
   



##########
plugins/integrations/kubernetes-service/src/main/java/org/apache/cloudstack/api/command/user/kubernetes/cluster/CreateKubernetesClusterCmd.java:
##########
@@ -81,7 +111,23 @@ public class CreateKubernetesClusterCmd extends 
BaseAsyncCreateCmd {
     @ACL(accessType = AccessType.UseEntry)
     @Parameter(name = ApiConstants.SERVICE_OFFERING_ID, type = 
CommandType.UUID, entityType = ServiceOfferingResponse.class,
             description = "the ID of the service offering for the virtual 
machines in the cluster.")
-    private Long serviceOfferingId;
+    protected Long serviceOfferingId;
+
+    @ACL(accessType = AccessType.UseEntry)
+    @Parameter(name = ApiConstants.NODE_TYPE_OFFERING_MAP, type = 
CommandType.MAP,
+            description = "(Optional) Node Type to Service Offering ID 
mapping. If provided, it overrides the serviceofferingid parameter")
+    protected Map<String, Map<String, String>> serviceOfferingNodeTypeMap;
+
+    @ACL(accessType = AccessType.UseEntry)
+    @Parameter(name = ApiConstants.NODE_TYPE_TEMPLATE_MAP, type = 
CommandType.MAP,
+            description = "(Optional) Node Type to Template ID mapping. If 
provided, it overrides the default template: System VM template")
+    protected Map<String, Map<String, String>> templateNodeTypeMap;
+
+    @ACL(accessType = AccessType.UseEntry)
+    @Parameter(name = ApiConstants.ETCD_NODES, type = CommandType.LONG,
+            description = "(Optional) Number of Kubernetes cluster etcd nodes, 
default is 0." +
+                    "In case the number is greater than 0, etcd nodes are 
separate from master nodes and are provisioned accordingly")
+    protected Long etcdNodes;

Review Comment:
   Is it necessary to define these attributes as `protected`?



##########
plugins/integrations/kubernetes-service/src/main/java/com/cloud/kubernetes/cluster/actionworkers/KubernetesClusterScaleWorker.java:
##########
@@ -446,37 +528,86 @@ public boolean scaleCluster() throws 
CloudRuntimeException {
         }
         scaleTimeoutTime = System.currentTimeMillis() + 
KubernetesClusterService.KubernetesClusterScaleTimeout.value() * 1000;
         final long originalClusterSize = kubernetesCluster.getNodeCount();
-        final ServiceOffering existingServiceOffering = 
serviceOfferingDao.findById(kubernetesCluster.getServiceOfferingId());
-        if (existingServiceOffering == null) {
-            logAndThrow(Level.ERROR, String.format("Scaling Kubernetes cluster 
%s failed, service offering for the Kubernetes cluster not found!", 
kubernetesCluster));
+        if (serviceOfferingNodeTypeMap.containsKey(DEFAULT.name())) {
+            final ServiceOffering existingServiceOffering = 
serviceOfferingDao.findById(kubernetesCluster.getServiceOfferingId());
+            if (existingServiceOffering == null) {
+                logAndThrow(Level.ERROR, String.format("Scaling Kubernetes 
cluster : %s failed, service offering for the Kubernetes cluster not found!", 
kubernetesCluster.getName()));
+            }
         }
+
         final boolean autoscalingChanged = isAutoscalingChanged();
-        final boolean serviceOfferingScalingNeeded = serviceOffering != null 
&& serviceOffering.getId() != existingServiceOffering.getId();
+        boolean hasDefaultOffering = 
serviceOfferingNodeTypeMap.containsKey(DEFAULT.name());
+        Long existingDefaultOfferingId = 
kubernetesCluster.getServiceOfferingId();
+        ServiceOffering defaultServiceOffering = 
serviceOfferingNodeTypeMap.getOrDefault(DEFAULT.name(), null);
+
+        for (KubernetesClusterNodeType nodeType : Arrays.asList(CONTROL, ETCD, 
WORKER)) {
+            boolean isWorkerNodeOrAllNodes = WORKER == nodeType;
+            final long newVMRequired = (!isWorkerNodeOrAllNodes || clusterSize 
== null) ? 0 : clusterSize - originalClusterSize;
+            if (!hasDefaultOffering && 
!serviceOfferingNodeTypeMap.containsKey(nodeType.name()) && newVMRequired == 0) 
{
+                continue;
+            }
 
-        if (autoscalingChanged) {
-            boolean autoScaled = autoscaleCluster(this.isAutoscalingEnabled, 
minSize, maxSize);
-            if (autoScaled && serviceOfferingScalingNeeded) {
-                scaleKubernetesClusterOffering();
+            boolean serviceOfferingScalingNeeded = 
isServiceOfferingScalingNeededForNodeType(nodeType, serviceOfferingNodeTypeMap, 
kubernetesCluster, existingDefaultOfferingId);
+            ServiceOffering serviceOffering = 
serviceOfferingNodeTypeMap.getOrDefault(nodeType.name(), 
defaultServiceOffering);
+            boolean updateNodeOffering = 
serviceOfferingNodeTypeMap.containsKey(nodeType.name());
+            boolean updateClusterOffering = isWorkerNodeOrAllNodes && 
hasDefaultOffering;
+            if (isWorkerNodeOrAllNodes && autoscalingChanged) {
+                boolean autoScaled = 
autoscaleCluster(this.isAutoscalingEnabled, minSize, maxSize);
+                if (autoScaled && serviceOfferingScalingNeeded) {
+                    scaleKubernetesClusterOffering(nodeType, serviceOffering, 
updateNodeOffering, updateClusterOffering);
+                }
+                stateTransitTo(kubernetesCluster.getId(), 
KubernetesCluster.Event.OperationSucceeded);
+                return autoScaled;
             }
-            stateTransitTo(kubernetesCluster.getId(), 
KubernetesCluster.Event.OperationSucceeded);
-            return autoScaled;
-        }
-        final boolean clusterSizeScalingNeeded = clusterSize != null && 
clusterSize != originalClusterSize;
-        final long newVMRequired = clusterSize == null ? 0 : clusterSize - 
originalClusterSize;
-        if (serviceOfferingScalingNeeded && clusterSizeScalingNeeded) {
-            if (newVMRequired > 0) {
-                scaleKubernetesClusterOffering();
-                scaleKubernetesClusterSize();
-            } else {
-                scaleKubernetesClusterSize();
-                scaleKubernetesClusterOffering();
+            final boolean clusterSizeScalingNeeded = isWorkerNodeOrAllNodes && 
clusterSize != null && clusterSize != originalClusterSize;
+            if (serviceOfferingScalingNeeded && clusterSizeScalingNeeded) {
+                if (newVMRequired > 0) {
+                    scaleKubernetesClusterOffering(nodeType, serviceOffering, 
updateNodeOffering, updateClusterOffering);
+                    scaleKubernetesClusterSize(nodeType);
+                } else {
+                    scaleKubernetesClusterSize(nodeType);
+                    scaleKubernetesClusterOffering(nodeType, serviceOffering, 
updateNodeOffering, updateClusterOffering);
+                }
+            } else if (serviceOfferingScalingNeeded) {
+                scaleKubernetesClusterOffering(nodeType, serviceOffering, 
updateNodeOffering, updateClusterOffering);
+            } else if (clusterSizeScalingNeeded) {
+                scaleKubernetesClusterSize(nodeType);
             }
-        } else if (serviceOfferingScalingNeeded) {
-            scaleKubernetesClusterOffering();
-        } else if (clusterSizeScalingNeeded) {
-            scaleKubernetesClusterSize();
         }
+
         stateTransitTo(kubernetesCluster.getId(), 
KubernetesCluster.Event.OperationSucceeded);
         return true;
     }
+
+    protected boolean 
isServiceOfferingScalingNeededForNodeType(KubernetesClusterNodeType nodeType,
+                                                                Map<String, 
ServiceOffering> map, KubernetesCluster kubernetesCluster,
+                                                                Long 
existingDefaultOfferingId) {
+        Long existingOfferingId = map.containsKey(DEFAULT.name()) ?
+                existingDefaultOfferingId :
+                getExistingOfferingIdForNodeType(nodeType, kubernetesCluster);

Review Comment:
   Similar to the suggestion on the `getExistingOfferingIdForNodeType` method, 
imagine that the k8s cluster only has a `control_service_offering_id` and 
`worker_service_offering_id` defined. Then, the parameter 
`existingDefaultOfferingId` would be `null`. If the user tries to scale the k8s 
cluster, only specifying the default service offering, then the 
`existingOfferingId` method variable would be `null`, leading to an error 
message.
   
   Therefore, it's only necessary to execute the 
`getExistingOfferingIdForNodeType` method in order to retrieve the existing 
offering ID of a `nodeType`; the `existingDefaultOfferingId` parameter could be 
removed as well.
   
   ```suggestion
           Long existingOfferingId = getExistingOfferingIdForNodeType(nodeType, 
kubernetesCluster);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudstack.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to