This is an automated email from the ASF dual-hosted git repository. ccondit pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push: new 02df2032 [YUNIKORN-3053] Support latest InPlacePodVerticalScaling spec (#961) 02df2032 is described below commit 02df203223c7c6a464cfd0296309d4285843727b Author: Craig Condit <ccon...@apache.org> AuthorDate: Thu Apr 3 12:17:19 2025 -0500 [YUNIKORN-3053] Support latest InPlacePodVerticalScaling spec (#961) Implement new version of the InPlacePodVerticalScaling spec introduced in Kubernetes 1.32 and promoted to beta in 1.33. Closes: #961 --- pkg/common/constants/constants.go | 4 + pkg/common/resource.go | 110 +++++++++++-------- pkg/common/resource_test.go | 168 +++++++++++------------------ pkg/plugin/predicates/predicate_manager.go | 6 +- scripts/kind-1.32.yaml | 1 + 5 files changed, 138 insertions(+), 151 deletions(-) diff --git a/pkg/common/constants/constants.go b/pkg/common/constants/constants.go index 09d9ea13..4ebfdc56 100644 --- a/pkg/common/constants/constants.go +++ b/pkg/common/constants/constants.go @@ -30,6 +30,10 @@ const False = "false" const Label = "label" const Annotation = "annotation" +// KEP-1287 (in-place update of pod resources) +const PodStatusPodResizing = "PodResizing" +const PodStatusPodResizePending = "PodResizePending" + // Cluster const DefaultNodeAttributeHostNameKey = "si.io/hostname" const DefaultNodeAttributeRackNameKey = "si.io/rackname" diff --git a/pkg/common/resource.go b/pkg/common/resource.go index 36c8c259..7f06cfba 100644 --- a/pkg/common/resource.go +++ b/pkg/common/resource.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" helpers "k8s.io/component-helpers/resource" + "github.com/apache/yunikorn-k8shim/pkg/common/constants" "github.com/apache/yunikorn-k8shim/pkg/log" siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common" "github.com/apache/yunikorn-scheduler-interface/lib/go/si" @@ -57,16 +58,28 @@ func GetPodResource(pod *v1.Pod) (resource *si.Resource) { Resources: map[string]*si.Quantity{"pods": {Value: 1}}, } - count := len(pod.Spec.Containers) - for i := 0; i < count; i++ { - podResource = Add(podResource, containerResource(pod, i)) + // Init container statuses and pod statuses are reported in separate places, so build a fixed map here to make + // this easier to compute. This is ever so slightly more resource-intensive than computing inline, but is far more + // readable and less error prone. Since resource calculation is only done once per pod update, this overhead is + // negligible. + containerStatuses := make(map[string]*v1.ContainerStatus, len(pod.Status.ContainerStatuses)+len(pod.Status.InitContainerStatuses)) + for idx := range pod.Status.ContainerStatuses { + containerStatuses[pod.Status.ContainerStatuses[idx].Name] = &pod.Status.ContainerStatuses[idx] + } + for idx := range pod.Status.InitContainerStatuses { + containerStatuses[pod.Status.InitContainerStatuses[idx].Name] = &pod.Status.InitContainerStatuses[idx] + } + + // Add usage for each container + for _, container := range pod.Spec.Containers { + podResource = Add(podResource, computeContainerResource(pod, &container, containerStatuses)) } // each resource compare between initcontainer and sum of containers // InitContainers(i) requirement=sum(Sidecar requirement i-1)+InitContainer(i) request // max(sum(Containers requirement)+sum(Sidecar requirement), InitContainer(i) requirement) - if pod.Spec.InitContainers != nil { - podResource = checkInitContainerRequest(pod, podResource) + if len(pod.Spec.InitContainers) > 0 { + podResource = checkInitContainerRequest(pod, podResource, containerStatuses) } // PodLevelResources feature: @@ -95,71 +108,80 @@ func GetPodResource(pod *v1.Pod) (resource *si.Resource) { return podResource } -func containerResource(pod *v1.Pod, i int) (resource *si.Resource) { - // K8s pod InPlacePodVerticalScaling from: - // alpha: v1.27 - // beta: v1.31? - // If AllocatedResources are present, these need to be used in preference to pod resource requests. - // Additionally, if the Resize pod status is Proposed, then the maximum of the request and allocated values need - // to be used. - requested := pod.Spec.Containers[i].Resources.Requests - if len(pod.Status.ContainerStatuses) == 0 { - return getResource(requested) - } - allocated := pod.Status.ContainerStatuses[i].AllocatedResources - if len(allocated) == 0 { - // no allocatedResources present, use requested - return getResource(requested) - } - if pod.Status.Resize == v1.PodResizeStatusProposed { - // resize proposed, be pessimistic and use larger of requested and allocated - return getMaxResource(requested, allocated) - } - // use allocated - return getResource(allocated) +// computeContainerResource computes the max(spec...resources, status...allocatedResources, status...resources) +// per KEP-1287 (in-place update of pod resources), unless resize status is PodResizeStatusInfeasible. +func computeContainerResource(pod *v1.Pod, container *v1.Container, containerStatuses map[string]*v1.ContainerStatus) *si.Resource { + combined := &si.Resource{Resources: make(map[string]*si.Quantity)} + updateMax(combined, getResource(container.Resources.Requests)) + if containerStatus := containerStatuses[container.Name]; containerStatus != nil { + if isResizeInfeasible(pod) && containerStatus.Resources != nil { + // resize request was denied; use container status requests as current value + return getResource(containerStatus.Resources.Requests) + } + updateMax(combined, getResource(containerStatus.AllocatedResources)) + if containerStatus.Resources != nil { + updateMax(combined, getResource(containerStatus.Resources.Requests)) + } + } + return combined +} + +// isResizeInfeasible determines if a currently in-progress pod resize is infeasible. This takes into account both the +// current pod.Status.Resize field as well as the upcoming PodResizePending pod condition (in spec, but not yet +// implemented). +func isResizeInfeasible(pod *v1.Pod) bool { + if pod.Status.Resize == v1.PodResizeStatusInfeasible { + return true + } + for _, condition := range pod.Status.Conditions { + if condition.Type == constants.PodStatusPodResizePending && condition.Reason == string(v1.PodResizeStatusInfeasible) { + return true + } + } + return false } -func getMaxResource(left v1.ResourceList, right v1.ResourceList) *si.Resource { - combined := getResource(left) - rightRes := getResource(right) - for key, rValue := range rightRes.Resources { - lValue, ok := combined.Resources[key] +// updateMax merges two resource lists into the leftmost, taking the max values of each resource type. +func updateMax(left *si.Resource, right *si.Resource) { + // short-circuit out if empty + if right == nil { + return + } + for key, rValue := range right.GetResources() { + lValue, ok := left.Resources[key] if !ok { // add new resource from right - combined.Resources[key] = rValue + left.Resources[key] = rValue continue } if rValue.GetValue() > lValue.GetValue() { // update resource with larger right value - combined.Resources[key] = rValue + left.Resources[key] = rValue } } - return combined } -func checkInitContainerRequest(pod *v1.Pod, containersResources *si.Resource) *si.Resource { +func checkInitContainerRequest(pod *v1.Pod, containersResources *si.Resource, containerStatuses map[string]*v1.ContainerStatus) *si.Resource { updatedRes := containersResources // update total pod resource usage with sidecar containers for _, c := range pod.Spec.InitContainers { - if isSideCarContainer(c) { - resourceList := c.Resources.Requests - sideCarResources := getResource(resourceList) + if isSideCarContainer(&c) { + sideCarResources := computeContainerResource(pod, &c, containerStatuses) updatedRes = Add(updatedRes, sideCarResources) } } var sideCarRequests *si.Resource // cumulative value of sidecar requests so far for _, c := range pod.Spec.InitContainers { - resourceList := c.Resources.Requests - ICResource := getResource(resourceList) - if isSideCarContainer(c) { + ICResource := computeContainerResource(pod, &c, containerStatuses) + if isSideCarContainer(&c) { sideCarRequests = Add(sideCarRequests, ICResource) } ICResource = Add(ICResource, sideCarRequests) for resourceName, ICRequest := range ICResource.Resources { containersRequests, exist := updatedRes.Resources[resourceName] - // addtional resource request from init cont, add it to request. + // additional resource request from init cont, add it to request. if !exist { updatedRes.Resources[resourceName] = ICRequest continue @@ -173,7 +195,7 @@ func checkInitContainerRequest(pod *v1.Pod, containersResources *si.Resource) *s return updatedRes } -func isSideCarContainer(c v1.Container) bool { +func isSideCarContainer(c *v1.Container) bool { return c.RestartPolicy != nil && *c.RestartPolicy == v1.ContainerRestartPolicyAlways } diff --git a/pkg/common/resource_test.go b/pkg/common/resource_test.go index 2d1d0197..c415f201 100644 --- a/pkg/common/resource_test.go +++ b/pkg/common/resource_test.go @@ -27,6 +27,7 @@ import ( apis "k8s.io/apimachinery/pkg/apis/meta/v1" k8res "k8s.io/component-helpers/resource" + "github.com/apache/yunikorn-k8shim/pkg/common/constants" "github.com/apache/yunikorn-k8shim/pkg/plugin/predicates" siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common" "github.com/apache/yunikorn-scheduler-interface/lib/go/si" @@ -441,41 +442,6 @@ func TestGetPodResourcesWithInPlacePodVerticalScaling(t *testing.T) { // ensure required K8s feature gates are enabled predicates.EnableOptionalKubernetesFeatureGates() - containers := make([]v1.Container, 0) - - // container 01 - c1Resources := make(map[v1.ResourceName]resource.Quantity) - c1Resources[v1.ResourceMemory] = resource.MustParse("500M") - c1Resources[v1.ResourceCPU] = resource.MustParse("1") - c1Resources["nvidia.com/gpu"] = resource.MustParse("1") - c1Allocated := make(map[v1.ResourceName]resource.Quantity) - c1Allocated[v1.ResourceMemory] = resource.MustParse("500M") - c1Allocated[v1.ResourceCPU] = resource.MustParse("1") - c1Allocated["nvidia.com/gpu"] = resource.MustParse("1") - containers = append(containers, v1.Container{ - Name: "container-01", - Resources: v1.ResourceRequirements{ - Requests: c1Resources, - }, - }) - - // container 02 - c2Resources := make(map[v1.ResourceName]resource.Quantity) - c2Resources[v1.ResourceMemory] = resource.MustParse("1024M") - c2Resources[v1.ResourceCPU] = resource.MustParse("2") - c2Resources["nvidia.com/gpu"] = resource.MustParse("4") - c2Allocated := make(map[v1.ResourceName]resource.Quantity) - c2Allocated[v1.ResourceMemory] = resource.MustParse("1024M") - c2Allocated[v1.ResourceCPU] = resource.MustParse("2") - c2Allocated["nvidia.com/gpu"] = resource.MustParse("4") - containers = append(containers, v1.Container{ - Name: "container-02", - Resources: v1.ResourceRequirements{ - Requests: c2Resources, - }, - }) - - // pod pod := &v1.Pod{ TypeMeta: apis.TypeMeta{ Kind: "Pod", @@ -486,7 +452,26 @@ func TestGetPodResourcesWithInPlacePodVerticalScaling(t *testing.T) { UID: "UID-00001", }, Spec: v1.PodSpec{ - Containers: containers, + Containers: []v1.Container{ + { + Name: "container-01", + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceMemory: resource.MustParse("1000M"), + v1.ResourceCPU: resource.MustParse("1"), + }, + }, + }, + { + Name: "container-02", + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceMemory: resource.MustParse("2000M"), + v1.ResourceCPU: resource.MustParse("2"), + }, + }, + }, + }, }, Status: v1.PodStatus{ ContainerStatuses: nil, @@ -496,110 +481,81 @@ func TestGetPodResourcesWithInPlacePodVerticalScaling(t *testing.T) { // verify we get aggregated resource from containers res := GetPodResource(pod) - assert.Equal(t, res.Resources[siCommon.Memory].GetValue(), int64(1524*1000*1000)) + assert.Equal(t, res.Resources[siCommon.Memory].GetValue(), int64(3000*1000*1000)) assert.Equal(t, res.Resources[siCommon.CPU].GetValue(), int64(3000)) - assert.Equal(t, res.Resources["nvidia.com/gpu"].GetValue(), int64(5)) assert.Equal(t, res.Resources["pods"].GetValue(), int64(1)) // validate with empty ContainerStatuses pod.Status.ContainerStatuses = []v1.ContainerStatus{} res = GetPodResource(pod) - assert.Equal(t, res.Resources[siCommon.Memory].GetValue(), int64(1524*1000*1000)) + assert.Equal(t, res.Resources[siCommon.Memory].GetValue(), int64(3000*1000*1000)) assert.Equal(t, res.Resources[siCommon.CPU].GetValue(), int64(3000)) - assert.Equal(t, res.Resources["nvidia.com/gpu"].GetValue(), int64(5)) assert.Equal(t, res.Resources["pods"].GetValue(), int64(1)) // validate with empty resources pod.Status.ContainerStatuses = []v1.ContainerStatus{ - {AllocatedResources: nil}, - {AllocatedResources: nil}, + {Name: "container-01", AllocatedResources: nil, Resources: nil}, + {Name: "container-02", AllocatedResources: nil, Resources: nil}, } res = GetPodResource(pod) - assert.Equal(t, res.Resources[siCommon.Memory].GetValue(), int64(1524*1000*1000)) + assert.Equal(t, res.Resources[siCommon.Memory].GetValue(), int64(3000*1000*1000)) assert.Equal(t, res.Resources[siCommon.CPU].GetValue(), int64(3000)) - assert.Equal(t, res.Resources["nvidia.com/gpu"].GetValue(), int64(5)) assert.Equal(t, res.Resources["pods"].GetValue(), int64(1)) // simulate the pod moving to running state by setting assigned resources to the same values pod.Status.ContainerStatuses = []v1.ContainerStatus{ - {AllocatedResources: c1Allocated}, - {AllocatedResources: c2Allocated}, + {Name: "container-01", AllocatedResources: pod.Spec.Containers[0].Resources.Requests.DeepCopy(), Resources: pod.Spec.Containers[0].Resources.DeepCopy()}, + {Name: "container-02", AllocatedResources: pod.Spec.Containers[1].Resources.Requests.DeepCopy(), Resources: pod.Spec.Containers[1].Resources.DeepCopy()}, } res = GetPodResource(pod) - assert.Equal(t, res.Resources[siCommon.Memory].GetValue(), int64(1524*1000*1000)) + assert.Equal(t, res.Resources[siCommon.Memory].GetValue(), int64(3000*1000*1000)) assert.Equal(t, res.Resources[siCommon.CPU].GetValue(), int64(3000)) - assert.Equal(t, res.Resources["nvidia.com/gpu"].GetValue(), int64(5)) assert.Equal(t, res.Resources["pods"].GetValue(), int64(1)) - // simulate a completed pod resize upwards - c1Allocated[v1.ResourceMemory] = resource.MustParse("1000M") - c1Allocated[v1.ResourceCPU] = resource.MustParse("2") - c2Allocated[v1.ResourceMemory] = resource.MustParse("2048M") - c2Allocated[v1.ResourceCPU] = resource.MustParse("4") - pod.Status.ContainerStatuses = []v1.ContainerStatus{ - {AllocatedResources: c1Allocated}, - {AllocatedResources: c2Allocated}, - } + // simulate a proposed pod resize (memory up, cpu down) + pod.Status.Resize = v1.PodResizeStatusProposed + pod.Spec.Containers[0].Resources.Requests[v1.ResourceMemory] = resource.MustParse("2000M") + pod.Spec.Containers[0].Resources.Requests[v1.ResourceCPU] = resource.MustParse("500m") + pod.Spec.Containers[1].Resources.Requests[v1.ResourceMemory] = resource.MustParse("4000M") + pod.Spec.Containers[1].Resources.Requests[v1.ResourceCPU] = resource.MustParse("1") res = GetPodResource(pod) - assert.Equal(t, res.Resources[siCommon.Memory].GetValue(), int64(3048*1000*1000)) - assert.Equal(t, res.Resources[siCommon.CPU].GetValue(), int64(6000)) - assert.Equal(t, res.Resources["nvidia.com/gpu"].GetValue(), int64(5)) + assert.Equal(t, res.Resources[siCommon.Memory].GetValue(), int64(6000*1000*1000)) + assert.Equal(t, res.Resources[siCommon.CPU].GetValue(), int64(3000)) assert.Equal(t, res.Resources["pods"].GetValue(), int64(1)) - // simulate a completed pod resize downwards - c1Allocated[v1.ResourceMemory] = resource.MustParse("250M") - c1Allocated[v1.ResourceCPU] = resource.MustParse("500m") - c2Allocated[v1.ResourceMemory] = resource.MustParse("512M") - c2Allocated[v1.ResourceCPU] = resource.MustParse("1") - pod.Status.ContainerStatuses = []v1.ContainerStatus{ - {AllocatedResources: c1Allocated}, - {AllocatedResources: c2Allocated}, - } + // simulate an infeasible pod resize + pod.Status.Resize = v1.PodResizeStatusInfeasible res = GetPodResource(pod) - assert.Equal(t, res.Resources[siCommon.Memory].GetValue(), int64(762*1000*1000)) - assert.Equal(t, res.Resources[siCommon.CPU].GetValue(), int64(1500)) - assert.Equal(t, res.Resources["nvidia.com/gpu"].GetValue(), int64(5)) + assert.Equal(t, res.Resources[siCommon.Memory].GetValue(), int64(3000*1000*1000)) + assert.Equal(t, res.Resources[siCommon.CPU].GetValue(), int64(3000)) assert.Equal(t, res.Resources["pods"].GetValue(), int64(1)) - // simulate a proposed resize, some up, some down - c1Allocated[v1.ResourceMemory] = resource.MustParse("250M") - c1Allocated[v1.ResourceCPU] = resource.MustParse("2") - c2Allocated[v1.ResourceMemory] = resource.MustParse("2048M") - c2Allocated[v1.ResourceCPU] = resource.MustParse("1") - pod.Status.Resize = v1.PodResizeStatusProposed - pod.Status.ContainerStatuses = []v1.ContainerStatus{ - {AllocatedResources: c1Allocated}, - {AllocatedResources: c2Allocated}, - } + // same, but using conditions only (simulates post-alpha behavior) + pod.Status.Resize = "" + pod.Status.Conditions = []v1.PodCondition{{Type: constants.PodStatusPodResizePending, Status: v1.ConditionTrue, Reason: string(v1.PodResizeStatusInfeasible)}} res = GetPodResource(pod) - assert.Equal(t, res.Resources[siCommon.Memory].GetValue(), int64(2548*1000*1000)) - assert.Equal(t, res.Resources[siCommon.CPU].GetValue(), int64(4000)) - assert.Equal(t, res.Resources["nvidia.com/gpu"].GetValue(), int64(5)) + assert.Equal(t, res.Resources[siCommon.Memory].GetValue(), int64(3000*1000*1000)) + assert.Equal(t, res.Resources[siCommon.CPU].GetValue(), int64(3000)) assert.Equal(t, res.Resources["pods"].GetValue(), int64(1)) - // case: requested resource types are fewer than allocated types - containers = make([]v1.Container, 0) - c1Resources = make(map[v1.ResourceName]resource.Quantity) - containers = append(containers, v1.Container{ - Name: "container-01", - Resources: v1.ResourceRequirements{ - Requests: c1Resources, - }, - }) - pod.Spec.Containers = containers - - c1Allocated[v1.ResourceMemory] = resource.MustParse("500M") - c1Allocated[v1.ResourceCPU] = resource.MustParse("2") - pod.Status.ContainerStatuses = []v1.ContainerStatus{ - {AllocatedResources: c1Allocated}, - {AllocatedResources: c2Allocated}, - } - pod.Status.Resize = v1.PodResizeStatusProposed + // simulate an in-progress pod resize + pod.Status.Resize = v1.PodResizeStatusInProgress + pod.Status.Conditions = []v1.PodCondition{{Type: constants.PodStatusPodResizing, Status: constants.True}} + pod.Status.ContainerStatuses[0].AllocatedResources = pod.Spec.Containers[0].Resources.Requests.DeepCopy() + pod.Status.ContainerStatuses[1].AllocatedResources = pod.Spec.Containers[1].Resources.Requests.DeepCopy() + res = GetPodResource(pod) + assert.Equal(t, res.Resources[siCommon.Memory].GetValue(), int64(6000*1000*1000)) + assert.Equal(t, res.Resources[siCommon.CPU].GetValue(), int64(3000)) + assert.Equal(t, res.Resources["pods"].GetValue(), int64(1)) + // simulate a completed pod resize + pod.Status.Resize = "" + pod.Status.Conditions = []v1.PodCondition{} + pod.Status.ContainerStatuses[0].Resources = pod.Spec.Containers[0].Resources.DeepCopy() + pod.Status.ContainerStatuses[1].Resources = pod.Spec.Containers[1].Resources.DeepCopy() res = GetPodResource(pod) - assert.Equal(t, res.Resources[siCommon.Memory].GetValue(), int64(500*1000*1000)) - assert.Equal(t, res.Resources[siCommon.CPU].GetValue(), int64(2000)) - assert.Equal(t, res.Resources["nvidia.com/gpu"].GetValue(), int64(1)) + assert.Equal(t, res.Resources[siCommon.Memory].GetValue(), int64(6000*1000*1000)) + assert.Equal(t, res.Resources[siCommon.CPU].GetValue(), int64(1500)) assert.Equal(t, res.Resources["pods"].GetValue(), int64(1)) } diff --git a/pkg/plugin/predicates/predicate_manager.go b/pkg/plugin/predicates/predicate_manager.go index cff56cd6..6d9d3f07 100644 --- a/pkg/plugin/predicates/predicate_manager.go +++ b/pkg/plugin/predicates/predicate_manager.go @@ -287,7 +287,11 @@ func EnableOptionalKubernetesFeatureGates() { } log.Log(log.ShimPredicates).Debug("Enabling InPlacePodVerticalScaling feature gate") if err := feature.DefaultMutableFeatureGate.Set(fmt.Sprintf("%s=true", features.InPlacePodVerticalScaling)); err != nil { - log.Log(log.ShimPredicates).Fatal("Unable to set PodLevelResources feature gate", zap.Error(err)) + log.Log(log.ShimPredicates).Fatal("Unable to set InPlacePodVerticalScaling feature gate", zap.Error(err)) + } + log.Log(log.ShimPredicates).Debug("Enabling InPlacePodVerticalScalingAllocatedStatus feature gate") + if err := feature.DefaultMutableFeatureGate.Set(fmt.Sprintf("%s=true", features.InPlacePodVerticalScalingAllocatedStatus)); err != nil { + log.Log(log.ShimPredicates).Fatal("Unable to set InPlacePodVerticalScalingAllocatedStatus feature gate", zap.Error(err)) } } diff --git a/scripts/kind-1.32.yaml b/scripts/kind-1.32.yaml index 581885da..cd7a729c 100644 --- a/scripts/kind-1.32.yaml +++ b/scripts/kind-1.32.yaml @@ -19,6 +19,7 @@ kind: Cluster apiVersion: kind.x-k8s.io/v1alpha4 featureGates: "InPlacePodVerticalScaling": true + "InPlacePodVerticalScalingAllocatedStatus": true "PodLevelResources": true nodes: - role: control-plane --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@yunikorn.apache.org For additional commands, e-mail: issues-h...@yunikorn.apache.org