This is an automated email from the ASF dual-hosted git repository.
mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 0d896fd1 [YUNIKORN-3141] Evaluate Preemption possibilities through
preconditions (#1038)
0d896fd1 is described below
commit 0d896fd14a60bdfde0f1a947366cadee38df1b44
Author: mani <[email protected]>
AuthorDate: Wed Nov 5 12:04:44 2025 +0530
[YUNIKORN-3141] Evaluate Preemption possibilities through preconditions
(#1038)
Closes: #1038
Signed-off-by: mani <[email protected]>
---
pkg/scheduler/objects/application_test.go | 16 ++--
pkg/scheduler/objects/queue.go | 58 ++++++++----
pkg/scheduler/objects/quota_change_preemptor.go | 50 ++++++++++
.../objects/quota_change_preemptor_test.go | 101 +++++++++++++++++++++
4 files changed, 201 insertions(+), 24 deletions(-)
diff --git a/pkg/scheduler/objects/application_test.go
b/pkg/scheduler/objects/application_test.go
index 2323466a..e25597ad 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -2238,29 +2238,29 @@ func TestTryAllocatePreemptQueue(t *testing.T) {
assert.Assert(t, alloc2 != nil, "alloc2 expected")
// preemption max attempts exhausted
- preemptionAttemptsRemaining = 0
- result3 :=
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
0}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator,
getNode)
+ maxAttemptsExhausted := 0
+ result3 :=
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
0}), true, 30*time.Second, &maxAttemptsExhausted, iterator, iterator, getNode)
assert.Assert(t, result3 == nil, "result3 not expected")
assert.Assert(t, !alloc2.IsPreempted(), "alloc2 should not have been
preempted")
log := ask3.GetAllocationLog()
assert.Equal(t, log[0].Message, common.PreemptionMaxAttemptsExhausted)
- assert.Equal(t, preemptionAttemptsRemaining, 0)
+ assert.Equal(t, maxAttemptsExhausted, 0)
- preemptionAttemptsRemaining = 10
+ maxAttemptsDecrease := 10
// on first attempt, not enough time has passed
- result3 =
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
0}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator,
getNode)
+ result3 =
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
0}), true, 30*time.Second, &maxAttemptsDecrease, iterator, iterator, getNode)
assert.Assert(t, result3 == nil, "result3 not expected")
assert.Assert(t, !alloc2.IsPreempted(), "alloc2 should not have been
preempted")
assertAllocationLog(t, ask3)
- assert.Equal(t, preemptionAttemptsRemaining, 10)
+ assert.Equal(t, maxAttemptsDecrease, 10)
// pass the time and try again
ask3.createTime = ask3.createTime.Add(-30 * time.Second)
- result3 =
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
0}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator,
getNode)
+ result3 =
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
0}), true, 30*time.Second, &maxAttemptsDecrease, iterator, iterator, getNode)
assert.Assert(t, result3 != nil && result3.Request != nil &&
result3.ResultType == Reserved, "alloc3 should be a reservation")
assert.Assert(t, alloc2.IsPreempted(), "alloc2 should have been
preempted")
- assert.Equal(t, preemptionAttemptsRemaining, 9)
+ assert.Equal(t, maxAttemptsDecrease, 9)
}
func TestTryAllocatePreemptNode(t *testing.T) {
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index db6b972a..71f30d6d 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -74,22 +74,24 @@ type Queue struct {
// The queue properties should be treated as immutable the value is a
merge of the
// parent properties with the config for this queue only manipulated
during creation
// of the queue or via a queue configuration update.
- properties map[string]string
- adminACL security.ACL // admin ACL
- submitACL security.ACL // submit ACL
- maxResource *resources.Resource // When not set, max =
nil
- guaranteedResource *resources.Resource // When not set,
Guaranteed == 0
- isLeaf bool // this is a leaf queue
or not (i.e. parent)
- isManaged bool // queue is part of the
config, not auto created
- stateMachine *fsm.FSM // the state of the
queue for scheduling
- stateTime time.Time // last time the state
was updated (needed for cleanup)
- maxRunningApps uint64
- runningApps uint64
- allocatingAcceptedApps map[string]bool
- template *template.Template
- queueEvents *schedEvt.QueueEvents
- appQueueMapping *AppQueueMapping // appID mapping to queues
- quotaChangePreemptionDelay uint64
+ properties map[string]string
+ adminACL security.ACL // admin ACL
+ submitACL security.ACL // submit ACL
+ maxResource *resources.Resource // When not set,
max = nil
+ guaranteedResource *resources.Resource // When not set,
Guaranteed == 0
+ isLeaf bool // this is a
leaf queue or not (i.e. parent)
+ isManaged bool // queue is part
of the config, not auto created
+ stateMachine *fsm.FSM // the state of
the queue for scheduling
+ stateTime time.Time // last time the
state was updated (needed for cleanup)
+ maxRunningApps uint64
+ runningApps uint64
+ allocatingAcceptedApps map[string]bool
+ template *template.Template
+ queueEvents *schedEvt.QueueEvents
+ appQueueMapping *AppQueueMapping // appID mapping to
queues
+ quotaChangePreemptionDelay uint64
+ hasTriggerredQuotaChangePreemption bool
+ isQuotaChangePreemptionRunning bool
locking.RWMutex
}
@@ -2076,3 +2078,27 @@ func (sq *Queue) recalculatePriority() int32 {
sq.currentPriority = curr
return priorityValueByPolicy(sq.priorityPolicy, sq.priorityOffset, curr)
}
+
+func (sq *Queue) MarkTriggerredQuotaChangePreemption() {
+ sq.Lock()
+ defer sq.Unlock()
+ sq.hasTriggerredQuotaChangePreemption = true
+}
+
+func (sq *Queue) HasTriggerredQuotaChangePreemption() bool {
+ sq.RLock()
+ defer sq.RUnlock()
+ return sq.hasTriggerredQuotaChangePreemption
+}
+
+func (sq *Queue) MarkQuotaChangePreemptionRunning() {
+ sq.Lock()
+ defer sq.Unlock()
+ sq.isQuotaChangePreemptionRunning = true
+}
+
+func (sq *Queue) IsQuotaChangePreemptionRunning() bool {
+ sq.RLock()
+ defer sq.RUnlock()
+ return sq.isQuotaChangePreemptionRunning
+}
diff --git a/pkg/scheduler/objects/quota_change_preemptor.go
b/pkg/scheduler/objects/quota_change_preemptor.go
new file mode 100644
index 00000000..8634ba2b
--- /dev/null
+++ b/pkg/scheduler/objects/quota_change_preemptor.go
@@ -0,0 +1,50 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+type QuotaChangePreemptionContext struct {
+ queue *Queue
+}
+
+func NewQuotaChangePreemptor(queue *Queue) *QuotaChangePreemptionContext {
+ preemptor := &QuotaChangePreemptionContext{
+ queue: queue,
+ }
+ return preemptor
+}
+
+func (qcp *QuotaChangePreemptionContext) CheckPreconditions() bool {
+ if !qcp.queue.IsLeafQueue() || !qcp.queue.IsManaged() ||
qcp.queue.HasTriggerredQuotaChangePreemption() ||
qcp.queue.IsQuotaChangePreemptionRunning() {
+ return false
+ }
+ if
qcp.queue.GetMaxResource().StrictlyGreaterThanOnlyExisting(qcp.queue.GetAllocatedResource())
{
+ return false
+ }
+ return true
+}
+
+func (qcp *QuotaChangePreemptionContext) tryPreemption() {
+ // quota change preemption has started, so mark the flag
+ qcp.queue.MarkQuotaChangePreemptionRunning()
+
+ // Preemption logic goes here
+
+ // quota change preemption has really evicted victims, so mark the flag
+ qcp.queue.MarkTriggerredQuotaChangePreemption()
+}
diff --git a/pkg/scheduler/objects/quota_change_preemptor_test.go
b/pkg/scheduler/objects/quota_change_preemptor_test.go
new file mode 100644
index 00000000..f7cd673d
--- /dev/null
+++ b/pkg/scheduler/objects/quota_change_preemptor_test.go
@@ -0,0 +1,101 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+ "testing"
+
+ "gotest.tools/v3/assert"
+
+ "github.com/apache/yunikorn-core/pkg/common/configs"
+ "github.com/apache/yunikorn-core/pkg/common/resources"
+)
+
+func TestQuotaChangeCheckPreconditions(t *testing.T) {
+ parentConfig := configs.QueueConfig{
+ Name: "parent",
+ Parent: true,
+ Resources: configs.Resources{
+ Max: map[string]string{"memory": "1000"},
+ },
+ }
+ parent, err := NewConfiguredQueue(parentConfig, nil, false, nil)
+ assert.NilError(t, err)
+
+ leafRes := configs.Resources{
+ Max: map[string]string{"memory": "1000"},
+ }
+ leaf, err := NewConfiguredQueue(configs.QueueConfig{
+ Name: "leaf",
+ Resources: leafRes,
+ }, parent, false, nil)
+ assert.NilError(t, err)
+
+ dynamicLeaf, err := NewConfiguredQueue(configs.QueueConfig{
+ Name: "dynamic-leaf",
+ Resources: leafRes,
+ }, parent, false, nil)
+ assert.NilError(t, err)
+ dynamicLeaf.isManaged = false
+
+ alreadyPreemptionRunning, err := NewConfiguredQueue(configs.QueueConfig{
+ Name: "leaf-already-preemption-running",
+ Resources: leafRes,
+ }, parent, false, nil)
+ assert.NilError(t, err)
+ alreadyPreemptionRunning.MarkQuotaChangePreemptionRunning()
+
+ alreadyTriggeredPreemption, err :=
NewConfiguredQueue(configs.QueueConfig{
+ Name: "leaf-already-triggerred-running",
+ Resources: leafRes,
+ }, parent, false, nil)
+ assert.NilError(t, err)
+ alreadyTriggeredPreemption.MarkTriggerredQuotaChangePreemption()
+
+ usageExceededMaxQueue, err := NewConfiguredQueue(configs.QueueConfig{
+ Name: "leaf-usage-exceeded-max",
+ Resources: leafRes,
+ }, parent, false, nil)
+ assert.NilError(t, err)
+ usageExceededMaxQueue.allocatedResource =
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 2000})
+
+ testCases := []struct {
+ name string
+ queue *Queue
+ preconditionResult bool
+ }{
+ {"parent queue", parent, false},
+ {"leaf queue", leaf, false},
+ {"dynamic leaf queue", dynamicLeaf, false},
+ {"leaf queue, already preemption process started or running",
alreadyPreemptionRunning, false},
+ {"leaf queue, already triggerred preemption",
alreadyTriggeredPreemption, false},
+ {"leaf queue, usage exceeded max resources",
usageExceededMaxQueue, true},
+ }
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ preemptor := NewQuotaChangePreemptor(tc.queue)
+ assert.Equal(t, preemptor.CheckPreconditions(),
tc.preconditionResult)
+ if tc.preconditionResult {
+ preemptor.tryPreemption()
+ assert.Equal(t,
tc.queue.HasTriggerredQuotaChangePreemption(), true)
+ assert.Equal(t,
tc.queue.IsQuotaChangePreemptionRunning(), true)
+ }
+ })
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]