This is an automated email from the ASF dual-hosted git repository.

pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git


The following commit(s) were added to refs/heads/master by this push:
     new c7a44ce5 [YUNIKORN-2864] Add e2e test for InPlacePodVerticalScaling 
feature (#965)
c7a44ce5 is described below

commit c7a44ce535994f83e0efe2af9ab88be04e5ecfdd
Author: kaichiachen <kaichia...@gmail.com>
AuthorDate: Thu Apr 10 15:44:57 2025 +0200

    [YUNIKORN-2864] Add e2e test for InPlacePodVerticalScaling feature (#965)
    
    Closes: #965
    
    Signed-off-by: Peter Bacsko <bacs...@gmail.com>
---
 test/e2e/framework/helpers/k8s/k8s_utils.go        |  44 ++++
 .../pod_resource_scaling_suite_test.go             |  79 ++++++++
 .../pod_resource_scaling_test.go                   | 222 +++++++++++++++++++++
 3 files changed, 345 insertions(+)

diff --git a/test/e2e/framework/helpers/k8s/k8s_utils.go 
b/test/e2e/framework/helpers/k8s/k8s_utils.go
index 985ef93b..e63f7ad7 100644
--- a/test/e2e/framework/helpers/k8s/k8s_utils.go
+++ b/test/e2e/framework/helpers/k8s/k8s_utils.go
@@ -18,12 +18,14 @@ package k8s
 
 import (
        "context"
+       "encoding/json"
        "errors"
        "fmt"
        "net/http"
        "net/url"
        "os"
        "path/filepath"
+       "strconv"
        "strings"
        "time"
 
@@ -41,8 +43,10 @@ import (
        storagev1 "k8s.io/api/storage/v1"
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/types"
        "k8s.io/apimachinery/pkg/util/httpstream"
        "k8s.io/apimachinery/pkg/util/wait"
+       "k8s.io/apimachinery/pkg/version"
        "k8s.io/cli-runtime/pkg/genericclioptions"
        "k8s.io/client-go/informers"
        "k8s.io/client-go/kubernetes"
@@ -152,6 +156,15 @@ func (k *KubeCtl) GetClient() *kubernetes.Clientset {
        return k.clientSet
 }
 
+// GetKubernetesVersion returns the version info from the Kubernetes server
+func (k *KubeCtl) GetKubernetesVersion() (*version.Info, error) {
+       k8sVer, err := k.clientSet.Discovery().ServerVersion()
+       if err != nil {
+               return k8sVer, err
+       }
+       return k8sVer, nil
+}
+
 func (k *KubeCtl) GetKubeConfig() (*rest.Config, error) {
        if k.kubeConfig != nil {
                return k.kubeConfig, nil
@@ -221,6 +234,37 @@ func (k *KubeCtl) UpdatePod(pod *v1.Pod, namespace string) 
(*v1.Pod, error) {
        return k.clientSet.CoreV1().Pods(namespace).Update(context.TODO(), pod, 
metav1.UpdateOptions{})
 }
 
+func (k *KubeCtl) PatchPod(pod *v1.Pod, namespace string, patch 
[]map[string]interface{}, subresources ...string) (*v1.Pod, error) {
+       patchBytes, err := json.Marshal(patch)
+       if err != nil {
+               return nil, err
+       }
+       return k.clientSet.CoreV1().Pods(namespace).Patch(
+               context.TODO(),
+               pod.Name,
+               types.JSONPatchType,
+               patchBytes,
+               metav1.PatchOptions{},
+               subresources...,
+       )
+}
+
+func (k *KubeCtl) ModifyResourceUsage(pod *v1.Pod, namespace string, newVcore 
int64, newMemory int64) (*v1.Pod, error) {
+       patch := []map[string]interface{}{
+               {
+                       "op":    "replace",
+                       "path":  "/spec/containers/0/resources/requests/cpu",
+                       "value": strconv.FormatInt(newVcore, 10) + "m",
+               },
+               {
+                       "op":    "replace",
+                       "path":  "/spec/containers/0/resources/requests/memory",
+                       "value": strconv.FormatInt(newMemory, 10) + "Mi",
+               },
+       }
+       return k.PatchPod(pod, namespace, patch, "resize")
+}
+
 func (k *KubeCtl) DeletePodAnnotation(pod *v1.Pod, namespace, annotation 
string) (*v1.Pod, error) {
        annotations := pod.Annotations
        delete(annotations, annotation)
diff --git a/test/e2e/pod_resource_scaling/pod_resource_scaling_suite_test.go 
b/test/e2e/pod_resource_scaling/pod_resource_scaling_suite_test.go
new file mode 100644
index 00000000..0049eb75
--- /dev/null
+++ b/test/e2e/pod_resource_scaling/pod_resource_scaling_suite_test.go
@@ -0,0 +1,79 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package pod_resource_scaling
+
+import (
+       "path/filepath"
+       "runtime"
+       "testing"
+
+       "github.com/onsi/ginkgo/v2"
+       "github.com/onsi/ginkgo/v2/reporters"
+       "github.com/onsi/gomega"
+
+       "github.com/apache/yunikorn-k8shim/test/e2e/framework/configmanager"
+       "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common"
+       "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/k8s"
+       "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/yunikorn"
+)
+
+func init() {
+       configmanager.YuniKornTestConfig.ParseFlags()
+}
+
+func TestPodResourceScaling(t *testing.T) {
+       ginkgo.ReportAfterSuite("TestPodResourceScaling", func(report 
ginkgo.Report) {
+               err := common.CreateJUnitReportDir()
+               Ω(err).NotTo(gomega.HaveOccurred())
+               err = reporters.GenerateJUnitReportWithConfig(
+                       report,
+                       filepath.Join(configmanager.YuniKornTestConfig.LogDir, 
"TEST-pod_resource_scaling_junit.xml"),
+                       reporters.JunitReportConfig{OmitSpecLabels: true},
+               )
+               Ω(err).NotTo(HaveOccurred())
+       })
+       gomega.RegisterFailHandler(ginkgo.Fail)
+       ginkgo.RunSpecs(t, "Pod Resource Scaling Suite")
+}
+
+var _ = ginkgo.BeforeSuite(func() {
+       _, filename, _, _ := runtime.Caller(0)
+       suiteName = common.GetSuiteName(filename)
+
+       // Initializing kubectl client
+       kClient = k8s.KubeCtl{}
+       Ω(kClient.SetClient()).To(gomega.BeNil())
+
+       // Initializing rest client
+       restClient = yunikorn.RClient{}
+       Ω(restClient).NotTo(gomega.BeNil())
+       yunikorn.EnsureYuniKornConfigsPresent()
+       yunikorn.UpdateConfigMapWrapper(oldConfigMap, "")
+})
+
+var _ = ginkgo.AfterSuite(func() {
+       yunikorn.RestoreConfigMapWrapper(oldConfigMap)
+})
+
+var Ω = gomega.Ω
+var HaveOccurred = gomega.HaveOccurred
+var Equal = gomega.Equal
+var By = ginkgo.By
+var BeforeEach = ginkgo.BeforeEach
+var AfterEach = ginkgo.AfterEach
diff --git a/test/e2e/pod_resource_scaling/pod_resource_scaling_test.go 
b/test/e2e/pod_resource_scaling/pod_resource_scaling_test.go
new file mode 100644
index 00000000..95d0fd53
--- /dev/null
+++ b/test/e2e/pod_resource_scaling/pod_resource_scaling_test.go
@@ -0,0 +1,222 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package pod_resource_scaling
+
+import (
+       "fmt"
+       "time"
+
+       "github.com/onsi/ginkgo/v2"
+       v1 "k8s.io/api/core/v1"
+
+       "github.com/apache/yunikorn-k8shim/pkg/common/utils"
+       "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common"
+       "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/k8s"
+       "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/yunikorn"
+)
+
+var kClient k8s.KubeCtl
+var restClient yunikorn.RClient
+var err error
+var ns string
+var oldConfigMap = new(v1.ConfigMap)
+var suiteName string
+
+var _ = BeforeEach(func() {
+       // Skip if K8s version < 1.32
+       k8sVer, err := kClient.GetKubernetesVersion()
+       Ω(err).NotTo(HaveOccurred())
+       if k8sVer.Major < "1" || (k8sVer.Major == "1" && k8sVer.Minor < "32") {
+               ginkgo.Skip("InPlacePodVerticalScaling requires K8s 1.32+")
+       }
+
+       // Create test namespace
+       ns = "test-" + common.RandSeq(10)
+       _, err = kClient.CreateNamespace(ns, nil)
+       Ω(err).NotTo(HaveOccurred())
+})
+
+var _ = ginkgo.AfterEach(func() {
+       By("Killing all pods")
+       err := kClient.DeletePods(ns)
+       Ω(err).NotTo(HaveOccurred())
+       err = kClient.DeleteNamespace(ns)
+       Ω(err).NotTo(HaveOccurred())
+})
+
+func verifyYunikornResourceUsage(appID, resourceName string, value int64) {
+       err = utils.WaitForCondition(func() bool {
+               app, err := restClient.GetAppInfo("default", "root."+ns, appID)
+               if err != nil || app == nil {
+                       fmt.Println(err)
+                       return false
+               }
+
+               if app.Allocations == nil {
+                       fmt.Println(app)
+                       return false
+               }
+
+               for _, alloc := range app.Allocations {
+                       resVal, exists := alloc.ResourcePerAlloc[resourceName]
+                       if !exists {
+                               return false
+                       }
+
+                       if resVal == value {
+                               return true
+                       }
+               }
+
+               return false
+       }, 30*time.Second, 120*time.Second)
+       Ω(err).NotTo(HaveOccurred(), fmt.Sprintf("Pod should be scheduled by 
YuniKorn with correct resource(%s) allocation", resourceName))
+}
+
+var _ = ginkgo.Describe("InPlacePodVerticalScaling", func() {
+       ginkgo.It("Pod resources(cpu/memory) resize up", func() {
+               // Create pod with initial resources
+               sleepPodConfigs := k8s.SleepPodConfig{NS: ns, Time: 600, CPU: 
100, Mem: 100, QOSClass: v1.PodQOSGuaranteed}
+               pod, err := k8s.InitSleepPod(sleepPodConfigs)
+               Ω(err).NotTo(HaveOccurred())
+
+               // Create pod
+               pod, err = kClient.CreatePod(pod, ns)
+               Ω(err).NotTo(HaveOccurred())
+
+               // Wait for pod running
+               err = kClient.WaitForPodRunning(ns, pod.Name, 60*time.Second)
+               Ω(err).NotTo(HaveOccurred())
+
+               // Check if pod is scheduled by YuniKorn and verify CPU 
allocation is 100m
+               
verifyYunikornResourceUsage(pod.ObjectMeta.Labels["applicationId"], "vcore", 
100)
+
+               // Get initial pod restart count
+               pod, err = kClient.GetPod(pod.Name, ns)
+               Ω(err).NotTo(HaveOccurred())
+               initialRestartCount := 
pod.Status.ContainerStatuses[0].RestartCount
+
+               pod, err = kClient.ModifyResourceUsage(pod, ns, 200, 100)
+               Ω(err).NotTo(HaveOccurred())
+
+               
Ω(pod.Status.ContainerStatuses[0].RestartCount).To(Equal(initialRestartCount), 
"Container should not have restarted")
+               
verifyYunikornResourceUsage(pod.ObjectMeta.Labels["applicationId"], "vcore", 
200)
+
+               pod, err = kClient.ModifyResourceUsage(pod, ns, 200, 200)
+               Ω(err).NotTo(HaveOccurred())
+
+               
Ω(pod.Status.ContainerStatuses[0].RestartCount).To(Equal(initialRestartCount), 
"Container should not have restarted")
+               
verifyYunikornResourceUsage(pod.ObjectMeta.Labels["applicationId"], "memory", 
200*1024*1024)
+       })
+
+       ginkgo.It("Pod resources(cpu/memory) resize down", func() {
+               // Create pod with initial resources
+               sleepPodConfigs := k8s.SleepPodConfig{NS: ns, Time: 600, CPU: 
200, Mem: 200, QOSClass: v1.PodQOSGuaranteed}
+               pod, err := k8s.InitSleepPod(sleepPodConfigs)
+               Ω(err).NotTo(HaveOccurred())
+
+               // Create pod
+               pod, err = kClient.CreatePod(pod, ns)
+               Ω(err).NotTo(HaveOccurred())
+
+               // Wait for pod running
+               err = kClient.WaitForPodRunning(ns, pod.Name, 60*time.Second)
+               Ω(err).NotTo(HaveOccurred())
+
+               // Check if pod is scheduled by YuniKorn and verify CPU 
allocation is 100m
+               
verifyYunikornResourceUsage(pod.ObjectMeta.Labels["applicationId"], "vcore", 
200)
+
+               // Get initial pod state
+               pod, err = kClient.GetPod(pod.Name, ns)
+               Ω(err).NotTo(HaveOccurred())
+               initialStartTime := pod.Status.StartTime
+               initialRestartCount := 
pod.Status.ContainerStatuses[0].RestartCount
+
+               pod, err = kClient.ModifyResourceUsage(pod, ns, 100, 200)
+               Ω(err).NotTo(HaveOccurred())
+
+               // Wait for resource update to be reflected
+               err = utils.WaitForCondition(func() bool {
+                       currentPod, err := kClient.GetPod(pod.Name, ns)
+                       if err != nil {
+                               return false
+                       }
+                       return 
currentPod.Spec.Containers[0].Resources.Requests.Cpu().MilliValue() == 
int64(100)
+               }, 10*time.Second, 120*time.Second)
+               Ω(err).NotTo(HaveOccurred())
+
+               Ω(err).NotTo(HaveOccurred())
+               Ω(pod.Status.StartTime).To(Equal(initialStartTime), "Pod should 
not have restarted")
+               
Ω(pod.Status.ContainerStatuses[0].RestartCount).To(Equal(initialRestartCount), 
"Container should not have restarted")
+               
verifyYunikornResourceUsage(pod.ObjectMeta.Labels["applicationId"], "vcore", 
100)
+
+               pod, err = kClient.ModifyResourceUsage(pod, ns, 100, 100)
+               Ω(err).NotTo(HaveOccurred()) // Expect an error as memory 
cannot be decreased
+
+               Ω(err).NotTo(HaveOccurred())
+               Ω(pod.Status.StartTime).To(Equal(initialStartTime), "Pod should 
not have restarted")
+               
Ω(pod.Status.ContainerStatuses[0].RestartCount).To(Equal(initialRestartCount), 
"Container should not have restarted")
+               
verifyYunikornResourceUsage(pod.ObjectMeta.Labels["applicationId"], "memory", 
100*1024*1024)
+       })
+
+       ginkgo.It("Pod resources(cpu/memory) resize to excessive values should 
fail", func() {
+               // Create pod with initial resources
+               sleepPodConfigs := k8s.SleepPodConfig{NS: ns, Time: 600, CPU: 
100, Mem: 100, QOSClass: v1.PodQOSGuaranteed}
+               pod, err := k8s.InitSleepPod(sleepPodConfigs)
+               Ω(err).NotTo(HaveOccurred())
+
+               // Create pod
+               _, err = kClient.CreatePod(pod, ns)
+               Ω(err).NotTo(HaveOccurred())
+
+               // Wait for pod running
+               err = kClient.WaitForPodRunning(ns, pod.Name, 60*time.Second)
+               Ω(err).NotTo(HaveOccurred())
+
+               // Check if pod is scheduled by YuniKorn and verify CPU 
allocation is 100m
+               
verifyYunikornResourceUsage(pod.ObjectMeta.Labels["applicationId"], "vcore", 
100)
+
+               // Get initial pod state
+               pod, err = kClient.GetPod(pod.Name, ns)
+               Ω(err).NotTo(HaveOccurred())
+               initialStartTime := pod.Status.StartTime
+               initialRestartCount := 
pod.Status.ContainerStatuses[0].RestartCount
+
+               // Patch CPU/Memory to an excessive value
+               pod, err = kClient.ModifyResourceUsage(pod, ns, 100000, 100000)
+               Ω(err).NotTo(HaveOccurred())
+
+               // Wait for resource update to be reflected
+               err = utils.WaitForCondition(func() bool {
+                       currentPod, err := kClient.GetPod(pod.Name, ns)
+                       if err != nil {
+                               return false
+                       }
+                       return currentPod.Status.Resize == 
v1.PodResizeStatusInfeasible
+               }, 10*time.Second, 120*time.Second)
+               Ω(err).NotTo(HaveOccurred())
+
+               Ω(err).NotTo(HaveOccurred())
+               Ω(pod.Status.StartTime).To(Equal(initialStartTime), "Pod should 
not have restarted")
+               
Ω(pod.Status.ContainerStatuses[0].RestartCount).To(Equal(initialRestartCount), 
"Container should not have restarted")
+
+               // Verify pod resource usage is unchanged after set an 
excessive value
+               
verifyYunikornResourceUsage(pod.ObjectMeta.Labels["applicationId"], "vcore", 
100)
+       })
+})


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@yunikorn.apache.org
For additional commands, e-mail: issues-h...@yunikorn.apache.org

Reply via email to