This is an automated email from the ASF dual-hosted git repository. pbacsko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push: new c7a44ce5 [YUNIKORN-2864] Add e2e test for InPlacePodVerticalScaling feature (#965) c7a44ce5 is described below commit c7a44ce535994f83e0efe2af9ab88be04e5ecfdd Author: kaichiachen <kaichia...@gmail.com> AuthorDate: Thu Apr 10 15:44:57 2025 +0200 [YUNIKORN-2864] Add e2e test for InPlacePodVerticalScaling feature (#965) Closes: #965 Signed-off-by: Peter Bacsko <bacs...@gmail.com> --- test/e2e/framework/helpers/k8s/k8s_utils.go | 44 ++++ .../pod_resource_scaling_suite_test.go | 79 ++++++++ .../pod_resource_scaling_test.go | 222 +++++++++++++++++++++ 3 files changed, 345 insertions(+) diff --git a/test/e2e/framework/helpers/k8s/k8s_utils.go b/test/e2e/framework/helpers/k8s/k8s_utils.go index 985ef93b..e63f7ad7 100644 --- a/test/e2e/framework/helpers/k8s/k8s_utils.go +++ b/test/e2e/framework/helpers/k8s/k8s_utils.go @@ -18,12 +18,14 @@ package k8s import ( "context" + "encoding/json" "errors" "fmt" "net/http" "net/url" "os" "path/filepath" + "strconv" "strings" "time" @@ -41,8 +43,10 @@ import ( storagev1 "k8s.io/api/storage/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/httpstream" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/version" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" @@ -152,6 +156,15 @@ func (k *KubeCtl) GetClient() *kubernetes.Clientset { return k.clientSet } +// GetKubernetesVersion returns the version info from the Kubernetes server +func (k *KubeCtl) GetKubernetesVersion() (*version.Info, error) { + k8sVer, err := k.clientSet.Discovery().ServerVersion() + if err != nil { + return k8sVer, err + } + return k8sVer, nil +} + func (k *KubeCtl) GetKubeConfig() (*rest.Config, error) { if k.kubeConfig != nil { return k.kubeConfig, nil @@ -221,6 +234,37 @@ func (k *KubeCtl) UpdatePod(pod *v1.Pod, namespace string) (*v1.Pod, error) { return k.clientSet.CoreV1().Pods(namespace).Update(context.TODO(), pod, metav1.UpdateOptions{}) } +func (k *KubeCtl) PatchPod(pod *v1.Pod, namespace string, patch []map[string]interface{}, subresources ...string) (*v1.Pod, error) { + patchBytes, err := json.Marshal(patch) + if err != nil { + return nil, err + } + return k.clientSet.CoreV1().Pods(namespace).Patch( + context.TODO(), + pod.Name, + types.JSONPatchType, + patchBytes, + metav1.PatchOptions{}, + subresources..., + ) +} + +func (k *KubeCtl) ModifyResourceUsage(pod *v1.Pod, namespace string, newVcore int64, newMemory int64) (*v1.Pod, error) { + patch := []map[string]interface{}{ + { + "op": "replace", + "path": "/spec/containers/0/resources/requests/cpu", + "value": strconv.FormatInt(newVcore, 10) + "m", + }, + { + "op": "replace", + "path": "/spec/containers/0/resources/requests/memory", + "value": strconv.FormatInt(newMemory, 10) + "Mi", + }, + } + return k.PatchPod(pod, namespace, patch, "resize") +} + func (k *KubeCtl) DeletePodAnnotation(pod *v1.Pod, namespace, annotation string) (*v1.Pod, error) { annotations := pod.Annotations delete(annotations, annotation) diff --git a/test/e2e/pod_resource_scaling/pod_resource_scaling_suite_test.go b/test/e2e/pod_resource_scaling/pod_resource_scaling_suite_test.go new file mode 100644 index 00000000..0049eb75 --- /dev/null +++ b/test/e2e/pod_resource_scaling/pod_resource_scaling_suite_test.go @@ -0,0 +1,79 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package pod_resource_scaling + +import ( + "path/filepath" + "runtime" + "testing" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/ginkgo/v2/reporters" + "github.com/onsi/gomega" + + "github.com/apache/yunikorn-k8shim/test/e2e/framework/configmanager" + "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common" + "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/k8s" + "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/yunikorn" +) + +func init() { + configmanager.YuniKornTestConfig.ParseFlags() +} + +func TestPodResourceScaling(t *testing.T) { + ginkgo.ReportAfterSuite("TestPodResourceScaling", func(report ginkgo.Report) { + err := common.CreateJUnitReportDir() + Ω(err).NotTo(gomega.HaveOccurred()) + err = reporters.GenerateJUnitReportWithConfig( + report, + filepath.Join(configmanager.YuniKornTestConfig.LogDir, "TEST-pod_resource_scaling_junit.xml"), + reporters.JunitReportConfig{OmitSpecLabels: true}, + ) + Ω(err).NotTo(HaveOccurred()) + }) + gomega.RegisterFailHandler(ginkgo.Fail) + ginkgo.RunSpecs(t, "Pod Resource Scaling Suite") +} + +var _ = ginkgo.BeforeSuite(func() { + _, filename, _, _ := runtime.Caller(0) + suiteName = common.GetSuiteName(filename) + + // Initializing kubectl client + kClient = k8s.KubeCtl{} + Ω(kClient.SetClient()).To(gomega.BeNil()) + + // Initializing rest client + restClient = yunikorn.RClient{} + Ω(restClient).NotTo(gomega.BeNil()) + yunikorn.EnsureYuniKornConfigsPresent() + yunikorn.UpdateConfigMapWrapper(oldConfigMap, "") +}) + +var _ = ginkgo.AfterSuite(func() { + yunikorn.RestoreConfigMapWrapper(oldConfigMap) +}) + +var Ω = gomega.Ω +var HaveOccurred = gomega.HaveOccurred +var Equal = gomega.Equal +var By = ginkgo.By +var BeforeEach = ginkgo.BeforeEach +var AfterEach = ginkgo.AfterEach diff --git a/test/e2e/pod_resource_scaling/pod_resource_scaling_test.go b/test/e2e/pod_resource_scaling/pod_resource_scaling_test.go new file mode 100644 index 00000000..95d0fd53 --- /dev/null +++ b/test/e2e/pod_resource_scaling/pod_resource_scaling_test.go @@ -0,0 +1,222 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package pod_resource_scaling + +import ( + "fmt" + "time" + + "github.com/onsi/ginkgo/v2" + v1 "k8s.io/api/core/v1" + + "github.com/apache/yunikorn-k8shim/pkg/common/utils" + "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common" + "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/k8s" + "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/yunikorn" +) + +var kClient k8s.KubeCtl +var restClient yunikorn.RClient +var err error +var ns string +var oldConfigMap = new(v1.ConfigMap) +var suiteName string + +var _ = BeforeEach(func() { + // Skip if K8s version < 1.32 + k8sVer, err := kClient.GetKubernetesVersion() + Ω(err).NotTo(HaveOccurred()) + if k8sVer.Major < "1" || (k8sVer.Major == "1" && k8sVer.Minor < "32") { + ginkgo.Skip("InPlacePodVerticalScaling requires K8s 1.32+") + } + + // Create test namespace + ns = "test-" + common.RandSeq(10) + _, err = kClient.CreateNamespace(ns, nil) + Ω(err).NotTo(HaveOccurred()) +}) + +var _ = ginkgo.AfterEach(func() { + By("Killing all pods") + err := kClient.DeletePods(ns) + Ω(err).NotTo(HaveOccurred()) + err = kClient.DeleteNamespace(ns) + Ω(err).NotTo(HaveOccurred()) +}) + +func verifyYunikornResourceUsage(appID, resourceName string, value int64) { + err = utils.WaitForCondition(func() bool { + app, err := restClient.GetAppInfo("default", "root."+ns, appID) + if err != nil || app == nil { + fmt.Println(err) + return false + } + + if app.Allocations == nil { + fmt.Println(app) + return false + } + + for _, alloc := range app.Allocations { + resVal, exists := alloc.ResourcePerAlloc[resourceName] + if !exists { + return false + } + + if resVal == value { + return true + } + } + + return false + }, 30*time.Second, 120*time.Second) + Ω(err).NotTo(HaveOccurred(), fmt.Sprintf("Pod should be scheduled by YuniKorn with correct resource(%s) allocation", resourceName)) +} + +var _ = ginkgo.Describe("InPlacePodVerticalScaling", func() { + ginkgo.It("Pod resources(cpu/memory) resize up", func() { + // Create pod with initial resources + sleepPodConfigs := k8s.SleepPodConfig{NS: ns, Time: 600, CPU: 100, Mem: 100, QOSClass: v1.PodQOSGuaranteed} + pod, err := k8s.InitSleepPod(sleepPodConfigs) + Ω(err).NotTo(HaveOccurred()) + + // Create pod + pod, err = kClient.CreatePod(pod, ns) + Ω(err).NotTo(HaveOccurred()) + + // Wait for pod running + err = kClient.WaitForPodRunning(ns, pod.Name, 60*time.Second) + Ω(err).NotTo(HaveOccurred()) + + // Check if pod is scheduled by YuniKorn and verify CPU allocation is 100m + verifyYunikornResourceUsage(pod.ObjectMeta.Labels["applicationId"], "vcore", 100) + + // Get initial pod restart count + pod, err = kClient.GetPod(pod.Name, ns) + Ω(err).NotTo(HaveOccurred()) + initialRestartCount := pod.Status.ContainerStatuses[0].RestartCount + + pod, err = kClient.ModifyResourceUsage(pod, ns, 200, 100) + Ω(err).NotTo(HaveOccurred()) + + Ω(pod.Status.ContainerStatuses[0].RestartCount).To(Equal(initialRestartCount), "Container should not have restarted") + verifyYunikornResourceUsage(pod.ObjectMeta.Labels["applicationId"], "vcore", 200) + + pod, err = kClient.ModifyResourceUsage(pod, ns, 200, 200) + Ω(err).NotTo(HaveOccurred()) + + Ω(pod.Status.ContainerStatuses[0].RestartCount).To(Equal(initialRestartCount), "Container should not have restarted") + verifyYunikornResourceUsage(pod.ObjectMeta.Labels["applicationId"], "memory", 200*1024*1024) + }) + + ginkgo.It("Pod resources(cpu/memory) resize down", func() { + // Create pod with initial resources + sleepPodConfigs := k8s.SleepPodConfig{NS: ns, Time: 600, CPU: 200, Mem: 200, QOSClass: v1.PodQOSGuaranteed} + pod, err := k8s.InitSleepPod(sleepPodConfigs) + Ω(err).NotTo(HaveOccurred()) + + // Create pod + pod, err = kClient.CreatePod(pod, ns) + Ω(err).NotTo(HaveOccurred()) + + // Wait for pod running + err = kClient.WaitForPodRunning(ns, pod.Name, 60*time.Second) + Ω(err).NotTo(HaveOccurred()) + + // Check if pod is scheduled by YuniKorn and verify CPU allocation is 100m + verifyYunikornResourceUsage(pod.ObjectMeta.Labels["applicationId"], "vcore", 200) + + // Get initial pod state + pod, err = kClient.GetPod(pod.Name, ns) + Ω(err).NotTo(HaveOccurred()) + initialStartTime := pod.Status.StartTime + initialRestartCount := pod.Status.ContainerStatuses[0].RestartCount + + pod, err = kClient.ModifyResourceUsage(pod, ns, 100, 200) + Ω(err).NotTo(HaveOccurred()) + + // Wait for resource update to be reflected + err = utils.WaitForCondition(func() bool { + currentPod, err := kClient.GetPod(pod.Name, ns) + if err != nil { + return false + } + return currentPod.Spec.Containers[0].Resources.Requests.Cpu().MilliValue() == int64(100) + }, 10*time.Second, 120*time.Second) + Ω(err).NotTo(HaveOccurred()) + + Ω(err).NotTo(HaveOccurred()) + Ω(pod.Status.StartTime).To(Equal(initialStartTime), "Pod should not have restarted") + Ω(pod.Status.ContainerStatuses[0].RestartCount).To(Equal(initialRestartCount), "Container should not have restarted") + verifyYunikornResourceUsage(pod.ObjectMeta.Labels["applicationId"], "vcore", 100) + + pod, err = kClient.ModifyResourceUsage(pod, ns, 100, 100) + Ω(err).NotTo(HaveOccurred()) // Expect an error as memory cannot be decreased + + Ω(err).NotTo(HaveOccurred()) + Ω(pod.Status.StartTime).To(Equal(initialStartTime), "Pod should not have restarted") + Ω(pod.Status.ContainerStatuses[0].RestartCount).To(Equal(initialRestartCount), "Container should not have restarted") + verifyYunikornResourceUsage(pod.ObjectMeta.Labels["applicationId"], "memory", 100*1024*1024) + }) + + ginkgo.It("Pod resources(cpu/memory) resize to excessive values should fail", func() { + // Create pod with initial resources + sleepPodConfigs := k8s.SleepPodConfig{NS: ns, Time: 600, CPU: 100, Mem: 100, QOSClass: v1.PodQOSGuaranteed} + pod, err := k8s.InitSleepPod(sleepPodConfigs) + Ω(err).NotTo(HaveOccurred()) + + // Create pod + _, err = kClient.CreatePod(pod, ns) + Ω(err).NotTo(HaveOccurred()) + + // Wait for pod running + err = kClient.WaitForPodRunning(ns, pod.Name, 60*time.Second) + Ω(err).NotTo(HaveOccurred()) + + // Check if pod is scheduled by YuniKorn and verify CPU allocation is 100m + verifyYunikornResourceUsage(pod.ObjectMeta.Labels["applicationId"], "vcore", 100) + + // Get initial pod state + pod, err = kClient.GetPod(pod.Name, ns) + Ω(err).NotTo(HaveOccurred()) + initialStartTime := pod.Status.StartTime + initialRestartCount := pod.Status.ContainerStatuses[0].RestartCount + + // Patch CPU/Memory to an excessive value + pod, err = kClient.ModifyResourceUsage(pod, ns, 100000, 100000) + Ω(err).NotTo(HaveOccurred()) + + // Wait for resource update to be reflected + err = utils.WaitForCondition(func() bool { + currentPod, err := kClient.GetPod(pod.Name, ns) + if err != nil { + return false + } + return currentPod.Status.Resize == v1.PodResizeStatusInfeasible + }, 10*time.Second, 120*time.Second) + Ω(err).NotTo(HaveOccurred()) + + Ω(err).NotTo(HaveOccurred()) + Ω(pod.Status.StartTime).To(Equal(initialStartTime), "Pod should not have restarted") + Ω(pod.Status.ContainerStatuses[0].RestartCount).To(Equal(initialRestartCount), "Container should not have restarted") + + // Verify pod resource usage is unchanged after set an excessive value + verifyYunikornResourceUsage(pod.ObjectMeta.Labels["applicationId"], "vcore", 100) + }) +}) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@yunikorn.apache.org For additional commands, e-mail: issues-h...@yunikorn.apache.org