This is an automated email from the ASF dual-hosted git repository. pbacsko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push: new 805d428b [YUNIKORN-3031] Add support for .spec.schedulingGates (#954) 805d428b is described below commit 805d428b9016de769759b2311c129f98cc72702a Author: Peter Bacsko <bacs...@gmail.com> AuthorDate: Mon Mar 3 19:01:37 2025 +0100 [YUNIKORN-3031] Add support for .spec.schedulingGates (#954) Closes: #954 Signed-off-by: Peter Bacsko <bacs...@gmail.com> --- pkg/cache/context.go | 30 ++++++++++--- pkg/cache/context_test.go | 70 +++++++++++++++++++++++++++++- pkg/shim/scheduler_test.go | 104 +++++++++++++++------------------------------ 3 files changed, 129 insertions(+), 75 deletions(-) diff --git a/pkg/cache/context.go b/pkg/cache/context.go index 28cac93f..ff691194 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -202,7 +202,7 @@ func (ctx *Context) updateNodeInternal(node *v1.Node, register bool) { if applicationID == "" { ctx.updateForeignPod(pod) } else { - ctx.updateYuniKornPod(applicationID, pod) + ctx.updateYuniKornPod(applicationID, nil, pod) } } @@ -284,7 +284,7 @@ func (ctx *Context) AddPod(obj interface{}) { ctx.UpdatePod(nil, obj) } -func (ctx *Context) UpdatePod(_, newObj interface{}) { +func (ctx *Context) UpdatePod(oldObj, newObj interface{}) { ctx.lock.Lock() defer ctx.lock.Unlock() pod, err := utils.Convert2Pod(newObj) @@ -292,15 +292,23 @@ func (ctx *Context) UpdatePod(_, newObj interface{}) { log.Log(log.ShimContext).Error("failed to update pod", zap.Error(err)) return } + var oldPod *v1.Pod + if oldObj != nil { + oldPod, err = utils.Convert2Pod(oldObj) + if err != nil { + log.Log(log.ShimContext).Error("failed to update pod", zap.Error(err)) + return + } + } applicationID := utils.GetApplicationIDFromPod(pod) if applicationID == "" { ctx.updateForeignPod(pod) } else { - ctx.updateYuniKornPod(applicationID, pod) + ctx.updateYuniKornPod(applicationID, oldPod, pod) } } -func (ctx *Context) updateYuniKornPod(appID string, pod *v1.Pod) { +func (ctx *Context) updateYuniKornPod(appID string, oldPod, pod *v1.Pod) { taskID := string(pod.UID) app := ctx.getApplication(appID) if app != nil { @@ -317,7 +325,19 @@ func (ctx *Context) updateYuniKornPod(appID string, pod *v1.Pod) { return } - if ctx.schedulerCache.UpdatePod(pod) { + hasGates := len(pod.Spec.SchedulingGates) > 0 + if hasGates && oldPod == nil { + gates := make([]string, 0, len(pod.Spec.SchedulingGates)) + for _, gate := range pod.Spec.SchedulingGates { + gates = append(gates, gate.Name) + } + events.GetRecorder().Eventf(pod.DeepCopy(), nil, v1.EventTypeNormal, "Scheduling", "Scheduling", + "waiting for scheduling gates: %s", strings.Join(gates, ",")) + log.Log(log.ShimContext).Info("pod is waiting for scheduling gates", zap.String("name", pod.Name), zap.Strings("gates", gates)) + } + + // always call UpdatePod() first to make sure the pod instance is the latest in the cache + if ctx.schedulerCache.UpdatePod(pod) && !hasGates { // pod was accepted; ensure the application and task objects have been created ctx.ensureAppAndTaskCreated(pod, app) } diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go index a3a6fc92..ca71bc94 100644 --- a/pkg/cache/context_test.go +++ b/pkg/cache/context_test.go @@ -386,7 +386,7 @@ func TestAddPod(t *testing.T) { assert.Check(t, pod == nil, "terminated pod was added") } -func TestUpdatePod(t *testing.T) { +func TestUpdatePod(t *testing.T) { //nolint:funlen context := initContextForTest() pod1 := &v1.Pod{ @@ -445,6 +445,9 @@ func TestUpdatePod(t *testing.T) { context.UpdatePod(nil, nil) context.UpdatePod(nil, pod1) context.UpdatePod(pod1, nil) + context.UpdatePod(&v1.Node{}, pod2) + podFromCache := context.schedulerCache.GetPod(uid1) + assert.Equal(t, "new", podFromCache.Annotations["test.state"]) // ensure a terminated pod is removed context.UpdatePod(pod1, pod3) @@ -461,6 +464,71 @@ func TestUpdatePod(t *testing.T) { if assert.Check(t, found != nil, "pod not found after update") { assert.Check(t, found.GetAnnotations()["test.state"] == "updated", "pod state not updated") } + + // scheduling gated pod + recorder := k8sEvents.NewFakeRecorder(1024) + events.SetRecorder(recorder) + defer events.SetRecorder(events.NewMockedRecorder()) + pod4 := &v1.Pod{ + TypeMeta: apis.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: apis.ObjectMeta{ + Name: appID2, + UID: uid2, + Annotations: map[string]string{ + constants.AnnotationApplicationID: appID2, + }, + }, + Spec: v1.PodSpec{ + SchedulerName: "yunikorn", + SchedulingGates: []v1.PodSchedulingGate{ + {Name: "gate1"}, + {Name: "gate2"}, + }, + }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, + } + context.UpdatePod(nil, pod4) + // check that the event has been published + select { + case event := <-recorder.Events: + assert.Assert(t, strings.Contains(event, "waiting for scheduling gates: gate1,gate2"), "unexpected k8s event received for gated pod: %s", event) + default: + t.Error("no k8s event received for gated pod") + } + assert.Assert(t, context.schedulerCache.GetPod(uid2) != nil, "pod was not found in scheduler cache") + assert.Assert(t, context.getApplication(appID2) == nil, "application was created") + pod5 := pod4.DeepCopy() + pod5.Spec.SchedulingGates = []v1.PodSchedulingGate{{Name: "gate1"}} // remove a single gate + context.UpdatePod(pod4, pod5) + // no events expected + select { + case event := <-recorder.Events: + t.Errorf("no k8s event expected, got: %s", event) + default: + } + pod5Cache := context.schedulerCache.GetPod(uid2) + assert.Assert(t, pod5Cache != nil, "pod was removed from the scheduler cache") + assert.Assert(t, context.getApplication(appID2) == nil, "application was created unexpectedly") + assert.Equal(t, 1, len(pod5Cache.Spec.SchedulingGates), "pod was not updated properly in the cache") + pod6 := pod5.DeepCopy() + pod6.Spec.SchedulingGates = nil // remove remaining gate + context.UpdatePod(pod5, pod6) + // no events expected + select { + case event := <-recorder.Events: + t.Errorf("no k8s event expected, got: %s", event) + default: + } + pod6Cache := context.schedulerCache.GetPod(uid2) + assert.Assert(t, pod6Cache != nil, "pod was removed from the scheduler cache") + assert.Equal(t, 0, len(pod6Cache.Spec.SchedulingGates), "pod was not updated properly in the cache") + assert.Assert(t, context.getApplication(appID2) != nil, "application was not created") + assert.Assert(t, context.getTask(appID2, uid2) != nil, "task was not created") } func TestDeletePod(t *testing.T) { diff --git a/pkg/shim/scheduler_test.go b/pkg/shim/scheduler_test.go index 1babc11a..afdcf92c 100644 --- a/pkg/shim/scheduler_test.go +++ b/pkg/shim/scheduler_test.go @@ -21,6 +21,7 @@ package shim import ( "fmt" "testing" + "time" "gotest.tools/v3/assert" v1 "k8s.io/api/core/v1" @@ -38,8 +39,7 @@ import ( "github.com/apache/yunikorn-scheduler-interface/lib/go/si" ) -func TestApplicationScheduling(t *testing.T) { - configData := ` +const configData = ` partitions: - name: default queues: @@ -55,6 +55,8 @@ partitions: memory: 150000000 vcore: 20 ` + +func TestApplicationScheduling(t *testing.T) { // init and register scheduler cluster := MockScheduler{} cluster.init() @@ -110,22 +112,6 @@ partitions: } func TestRejectApplications(t *testing.T) { - configData := ` -partitions: - - name: default - queues: - - name: root - submitacl: "*" - queues: - - name: a - resources: - guaranteed: - memory: 100000000 - vcore: 10 - max: - memory: 150000000 - vcore: 20 -` // init and register scheduler cluster := MockScheduler{} cluster.init() @@ -191,25 +177,6 @@ func TestSchedulerRegistrationFailed(t *testing.T) { } func TestTaskFailures(t *testing.T) { - configData := ` -partitions: - - - name: default - queues: - - - name: root - submitacl: "*" - queues: - - - name: a - resources: - guaranteed: - memory: 100000000 - vcore: 10 - max: - memory: 100000000 - vcore: 10 -` // init and register scheduler cluster := MockScheduler{} cluster.init() @@ -261,22 +228,6 @@ partitions: // simulate PVC error during Context.AssumePod() call func TestAssumePodError(t *testing.T) { - configData := ` -partitions: - - name: default - queues: - - name: root - submitacl: "*" - queues: - - name: a - resources: - guaranteed: - memory: 100000000 - vcore: 10 - max: - memory: 150000000 - vcore: 20 -` cluster := MockScheduler{} cluster.init() binder := test.NewVolumeBinderMock() @@ -306,22 +257,6 @@ partitions: } func TestForeignPodTracking(t *testing.T) { - configData := ` -partitions: - - name: default - queues: - - name: root - submitacl: "*" - queues: - - name: a - resources: - guaranteed: - memory: 100000000 - vcore: 10 - max: - memory: 150000000 - vcore: 20 -` cluster := MockScheduler{} cluster.init() assert.NilError(t, cluster.start(), "failed to start cluster") @@ -374,6 +309,37 @@ partitions: assert.NilError(t, err) } +func TestSchedulingGates(t *testing.T) { + cluster := MockScheduler{} + cluster.init() + assert.NilError(t, cluster.start(), "failed to start cluster") + defer cluster.stop() + + err := cluster.updateConfig(configData, nil) + assert.NilError(t, err, "update config failed") + addNode(&cluster, "node-1") + + podResource := common.NewResourceBuilder(). + AddResource(siCommon.Memory, 50000000). + AddResource(siCommon.CPU, 5). + Build() + pod1 := createTestPod("root.a", "app0001", "task0001", podResource) + pod1.Spec.SchedulingGates = []v1.PodSchedulingGate{{Name: "gate"}} + + cluster.AddPod(pod1) + time.Sleep(time.Second) + app := cluster.context.GetApplication("app0001") + assert.Assert(t, app == nil, "application should not exist in the shim") + coreApp := cluster.getApplicationFromCore("app0001", partitionName) + assert.Assert(t, coreApp == nil, "application should not exist in the core") + + pod1Upd := pod1.DeepCopy() + pod1Upd.Spec.SchedulingGates = nil + cluster.UpdatePod(pod1, pod1Upd) + err = cluster.waitForApplicationStateInCore("app0001", partitionName, "Running") + assert.NilError(t, err, "application has not transitioned to Running state") +} + func createTestPod(queue string, appID string, taskID string, taskResource *si.Resource) *v1.Pod { containers := make([]v1.Container, 0) c1Resources := make(map[v1.ResourceName]resource.Quantity) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@yunikorn.apache.org For additional commands, e-mail: issues-h...@yunikorn.apache.org