This is an automated email from the ASF dual-hosted git repository.

pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git


The following commit(s) were added to refs/heads/master by this push:
     new 805d428b [YUNIKORN-3031] Add support for .spec.schedulingGates (#954)
805d428b is described below

commit 805d428b9016de769759b2311c129f98cc72702a
Author: Peter Bacsko <bacs...@gmail.com>
AuthorDate: Mon Mar 3 19:01:37 2025 +0100

    [YUNIKORN-3031] Add support for .spec.schedulingGates (#954)
    
    Closes: #954
    
    Signed-off-by: Peter Bacsko <bacs...@gmail.com>
---
 pkg/cache/context.go       |  30 ++++++++++---
 pkg/cache/context_test.go  |  70 +++++++++++++++++++++++++++++-
 pkg/shim/scheduler_test.go | 104 +++++++++++++++------------------------------
 3 files changed, 129 insertions(+), 75 deletions(-)

diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index 28cac93f..ff691194 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -202,7 +202,7 @@ func (ctx *Context) updateNodeInternal(node *v1.Node, 
register bool) {
                        if applicationID == "" {
                                ctx.updateForeignPod(pod)
                        } else {
-                               ctx.updateYuniKornPod(applicationID, pod)
+                               ctx.updateYuniKornPod(applicationID, nil, pod)
                        }
                }
 
@@ -284,7 +284,7 @@ func (ctx *Context) AddPod(obj interface{}) {
        ctx.UpdatePod(nil, obj)
 }
 
-func (ctx *Context) UpdatePod(_, newObj interface{}) {
+func (ctx *Context) UpdatePod(oldObj, newObj interface{}) {
        ctx.lock.Lock()
        defer ctx.lock.Unlock()
        pod, err := utils.Convert2Pod(newObj)
@@ -292,15 +292,23 @@ func (ctx *Context) UpdatePod(_, newObj interface{}) {
                log.Log(log.ShimContext).Error("failed to update pod", 
zap.Error(err))
                return
        }
+       var oldPod *v1.Pod
+       if oldObj != nil {
+               oldPod, err = utils.Convert2Pod(oldObj)
+               if err != nil {
+                       log.Log(log.ShimContext).Error("failed to update pod", 
zap.Error(err))
+                       return
+               }
+       }
        applicationID := utils.GetApplicationIDFromPod(pod)
        if applicationID == "" {
                ctx.updateForeignPod(pod)
        } else {
-               ctx.updateYuniKornPod(applicationID, pod)
+               ctx.updateYuniKornPod(applicationID, oldPod, pod)
        }
 }
 
-func (ctx *Context) updateYuniKornPod(appID string, pod *v1.Pod) {
+func (ctx *Context) updateYuniKornPod(appID string, oldPod, pod *v1.Pod) {
        taskID := string(pod.UID)
        app := ctx.getApplication(appID)
        if app != nil {
@@ -317,7 +325,19 @@ func (ctx *Context) updateYuniKornPod(appID string, pod 
*v1.Pod) {
                return
        }
 
-       if ctx.schedulerCache.UpdatePod(pod) {
+       hasGates := len(pod.Spec.SchedulingGates) > 0
+       if hasGates && oldPod == nil {
+               gates := make([]string, 0, len(pod.Spec.SchedulingGates))
+               for _, gate := range pod.Spec.SchedulingGates {
+                       gates = append(gates, gate.Name)
+               }
+               events.GetRecorder().Eventf(pod.DeepCopy(), nil, 
v1.EventTypeNormal, "Scheduling", "Scheduling",
+                       "waiting for scheduling gates: %s", strings.Join(gates, 
","))
+               log.Log(log.ShimContext).Info("pod is waiting for scheduling 
gates", zap.String("name", pod.Name), zap.Strings("gates", gates))
+       }
+
+       // always call UpdatePod() first to make sure the pod instance is the 
latest in the cache
+       if ctx.schedulerCache.UpdatePod(pod) && !hasGates {
                // pod was accepted; ensure the application and task objects 
have been created
                ctx.ensureAppAndTaskCreated(pod, app)
        }
diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go
index a3a6fc92..ca71bc94 100644
--- a/pkg/cache/context_test.go
+++ b/pkg/cache/context_test.go
@@ -386,7 +386,7 @@ func TestAddPod(t *testing.T) {
        assert.Check(t, pod == nil, "terminated pod was added")
 }
 
-func TestUpdatePod(t *testing.T) {
+func TestUpdatePod(t *testing.T) { //nolint:funlen
        context := initContextForTest()
 
        pod1 := &v1.Pod{
@@ -445,6 +445,9 @@ func TestUpdatePod(t *testing.T) {
        context.UpdatePod(nil, nil)
        context.UpdatePod(nil, pod1)
        context.UpdatePod(pod1, nil)
+       context.UpdatePod(&v1.Node{}, pod2)
+       podFromCache := context.schedulerCache.GetPod(uid1)
+       assert.Equal(t, "new", podFromCache.Annotations["test.state"])
 
        // ensure a terminated pod is removed
        context.UpdatePod(pod1, pod3)
@@ -461,6 +464,71 @@ func TestUpdatePod(t *testing.T) {
        if assert.Check(t, found != nil, "pod not found after update") {
                assert.Check(t, found.GetAnnotations()["test.state"] == 
"updated", "pod state not updated")
        }
+
+       // scheduling gated pod
+       recorder := k8sEvents.NewFakeRecorder(1024)
+       events.SetRecorder(recorder)
+       defer events.SetRecorder(events.NewMockedRecorder())
+       pod4 := &v1.Pod{
+               TypeMeta: apis.TypeMeta{
+                       Kind:       "Pod",
+                       APIVersion: "v1",
+               },
+               ObjectMeta: apis.ObjectMeta{
+                       Name: appID2,
+                       UID:  uid2,
+                       Annotations: map[string]string{
+                               constants.AnnotationApplicationID: appID2,
+                       },
+               },
+               Spec: v1.PodSpec{
+                       SchedulerName: "yunikorn",
+                       SchedulingGates: []v1.PodSchedulingGate{
+                               {Name: "gate1"},
+                               {Name: "gate2"},
+                       },
+               },
+               Status: v1.PodStatus{
+                       Phase: v1.PodPending,
+               },
+       }
+       context.UpdatePod(nil, pod4)
+       // check that the event has been published
+       select {
+       case event := <-recorder.Events:
+               assert.Assert(t, strings.Contains(event, "waiting for 
scheduling gates: gate1,gate2"), "unexpected k8s event received for gated pod: 
%s", event)
+       default:
+               t.Error("no k8s event received for gated pod")
+       }
+       assert.Assert(t, context.schedulerCache.GetPod(uid2) != nil, "pod was 
not found in scheduler cache")
+       assert.Assert(t, context.getApplication(appID2) == nil, "application 
was created")
+       pod5 := pod4.DeepCopy()
+       pod5.Spec.SchedulingGates = []v1.PodSchedulingGate{{Name: "gate1"}} // 
remove a single gate
+       context.UpdatePod(pod4, pod5)
+       // no events expected
+       select {
+       case event := <-recorder.Events:
+               t.Errorf("no k8s event expected, got: %s", event)
+       default:
+       }
+       pod5Cache := context.schedulerCache.GetPod(uid2)
+       assert.Assert(t, pod5Cache != nil, "pod was removed from the scheduler 
cache")
+       assert.Assert(t, context.getApplication(appID2) == nil, "application 
was created unexpectedly")
+       assert.Equal(t, 1, len(pod5Cache.Spec.SchedulingGates), "pod was not 
updated properly in the cache")
+       pod6 := pod5.DeepCopy()
+       pod6.Spec.SchedulingGates = nil // remove remaining gate
+       context.UpdatePod(pod5, pod6)
+       // no events expected
+       select {
+       case event := <-recorder.Events:
+               t.Errorf("no k8s event expected, got: %s", event)
+       default:
+       }
+       pod6Cache := context.schedulerCache.GetPod(uid2)
+       assert.Assert(t, pod6Cache != nil, "pod was removed from the scheduler 
cache")
+       assert.Equal(t, 0, len(pod6Cache.Spec.SchedulingGates), "pod was not 
updated properly in the cache")
+       assert.Assert(t, context.getApplication(appID2) != nil, "application 
was not created")
+       assert.Assert(t, context.getTask(appID2, uid2) != nil, "task was not 
created")
 }
 
 func TestDeletePod(t *testing.T) {
diff --git a/pkg/shim/scheduler_test.go b/pkg/shim/scheduler_test.go
index 1babc11a..afdcf92c 100644
--- a/pkg/shim/scheduler_test.go
+++ b/pkg/shim/scheduler_test.go
@@ -21,6 +21,7 @@ package shim
 import (
        "fmt"
        "testing"
+       "time"
 
        "gotest.tools/v3/assert"
        v1 "k8s.io/api/core/v1"
@@ -38,8 +39,7 @@ import (
        "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
 )
 
-func TestApplicationScheduling(t *testing.T) {
-       configData := `
+const configData = `
 partitions:
   - name: default
     queues:
@@ -55,6 +55,8 @@ partitions:
                 memory: 150000000
                 vcore: 20
 `
+
+func TestApplicationScheduling(t *testing.T) {
        // init and register scheduler
        cluster := MockScheduler{}
        cluster.init()
@@ -110,22 +112,6 @@ partitions:
 }
 
 func TestRejectApplications(t *testing.T) {
-       configData := `
-partitions:
-  - name: default
-    queues:
-      - name: root
-        submitacl: "*"
-        queues:
-          - name: a
-            resources:
-              guaranteed:
-                memory: 100000000
-                vcore: 10
-              max:
-                memory: 150000000
-                vcore: 20
-`
        // init and register scheduler
        cluster := MockScheduler{}
        cluster.init()
@@ -191,25 +177,6 @@ func TestSchedulerRegistrationFailed(t *testing.T) {
 }
 
 func TestTaskFailures(t *testing.T) {
-       configData := `
-partitions:
- -
-   name: default
-   queues:
-     -
-       name: root
-       submitacl: "*"
-       queues:
-         -
-           name: a
-           resources:
-             guaranteed:
-               memory: 100000000
-               vcore: 10
-             max:
-               memory: 100000000
-               vcore: 10
-`
        // init and register scheduler
        cluster := MockScheduler{}
        cluster.init()
@@ -261,22 +228,6 @@ partitions:
 
 // simulate PVC error during Context.AssumePod() call
 func TestAssumePodError(t *testing.T) {
-       configData := `
-partitions:
-  - name: default
-    queues:
-      - name: root
-        submitacl: "*"
-        queues:
-          - name: a
-            resources:
-              guaranteed:
-                memory: 100000000
-                vcore: 10
-              max:
-                memory: 150000000
-                vcore: 20
-`
        cluster := MockScheduler{}
        cluster.init()
        binder := test.NewVolumeBinderMock()
@@ -306,22 +257,6 @@ partitions:
 }
 
 func TestForeignPodTracking(t *testing.T) {
-       configData := `
-partitions:
-  - name: default
-    queues:
-      - name: root
-        submitacl: "*"
-        queues:
-          - name: a
-            resources:
-              guaranteed:
-                memory: 100000000
-                vcore: 10
-              max:
-                memory: 150000000
-                vcore: 20
-`
        cluster := MockScheduler{}
        cluster.init()
        assert.NilError(t, cluster.start(), "failed to start cluster")
@@ -374,6 +309,37 @@ partitions:
        assert.NilError(t, err)
 }
 
+func TestSchedulingGates(t *testing.T) {
+       cluster := MockScheduler{}
+       cluster.init()
+       assert.NilError(t, cluster.start(), "failed to start cluster")
+       defer cluster.stop()
+
+       err := cluster.updateConfig(configData, nil)
+       assert.NilError(t, err, "update config failed")
+       addNode(&cluster, "node-1")
+
+       podResource := common.NewResourceBuilder().
+               AddResource(siCommon.Memory, 50000000).
+               AddResource(siCommon.CPU, 5).
+               Build()
+       pod1 := createTestPod("root.a", "app0001", "task0001", podResource)
+       pod1.Spec.SchedulingGates = []v1.PodSchedulingGate{{Name: "gate"}}
+
+       cluster.AddPod(pod1)
+       time.Sleep(time.Second)
+       app := cluster.context.GetApplication("app0001")
+       assert.Assert(t, app == nil, "application should not exist in the shim")
+       coreApp := cluster.getApplicationFromCore("app0001", partitionName)
+       assert.Assert(t, coreApp == nil, "application should not exist in the 
core")
+
+       pod1Upd := pod1.DeepCopy()
+       pod1Upd.Spec.SchedulingGates = nil
+       cluster.UpdatePod(pod1, pod1Upd)
+       err = cluster.waitForApplicationStateInCore("app0001", partitionName, 
"Running")
+       assert.NilError(t, err, "application has not transitioned to Running 
state")
+}
+
 func createTestPod(queue string, appID string, taskID string, taskResource 
*si.Resource) *v1.Pod {
        containers := make([]v1.Container, 0)
        c1Resources := make(map[v1.ResourceName]resource.Quantity)


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@yunikorn.apache.org
For additional commands, e-mail: issues-h...@yunikorn.apache.org

Reply via email to