This is an automated email from the ASF dual-hosted git repository.

mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d896fd1 [YUNIKORN-3141] Evaluate Preemption possibilities through 
preconditions (#1038)
0d896fd1 is described below

commit 0d896fd14a60bdfde0f1a947366cadee38df1b44
Author: mani <[email protected]>
AuthorDate: Wed Nov 5 12:04:44 2025 +0530

    [YUNIKORN-3141] Evaluate Preemption possibilities through preconditions 
(#1038)
    
    Closes: #1038
    
    Signed-off-by: mani <[email protected]>
---
 pkg/scheduler/objects/application_test.go          |  16 ++--
 pkg/scheduler/objects/queue.go                     |  58 ++++++++----
 pkg/scheduler/objects/quota_change_preemptor.go    |  50 ++++++++++
 .../objects/quota_change_preemptor_test.go         | 101 +++++++++++++++++++++
 4 files changed, 201 insertions(+), 24 deletions(-)

diff --git a/pkg/scheduler/objects/application_test.go 
b/pkg/scheduler/objects/application_test.go
index 2323466a..e25597ad 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -2238,29 +2238,29 @@ func TestTryAllocatePreemptQueue(t *testing.T) {
        assert.Assert(t, alloc2 != nil, "alloc2 expected")
 
        // preemption max attempts exhausted
-       preemptionAttemptsRemaining = 0
-       result3 := 
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 0}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, 
getNode)
+       maxAttemptsExhausted := 0
+       result3 := 
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 0}), true, 30*time.Second, &maxAttemptsExhausted, iterator, iterator, getNode)
        assert.Assert(t, result3 == nil, "result3 not expected")
        assert.Assert(t, !alloc2.IsPreempted(), "alloc2 should not have been 
preempted")
        log := ask3.GetAllocationLog()
        assert.Equal(t, log[0].Message, common.PreemptionMaxAttemptsExhausted)
-       assert.Equal(t, preemptionAttemptsRemaining, 0)
+       assert.Equal(t, maxAttemptsExhausted, 0)
 
-       preemptionAttemptsRemaining = 10
+       maxAttemptsDecrease := 10
 
        // on first attempt, not enough time has passed
-       result3 = 
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 0}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, 
getNode)
+       result3 = 
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 0}), true, 30*time.Second, &maxAttemptsDecrease, iterator, iterator, getNode)
        assert.Assert(t, result3 == nil, "result3 not expected")
        assert.Assert(t, !alloc2.IsPreempted(), "alloc2 should not have been 
preempted")
        assertAllocationLog(t, ask3)
-       assert.Equal(t, preemptionAttemptsRemaining, 10)
+       assert.Equal(t, maxAttemptsDecrease, 10)
 
        // pass the time and try again
        ask3.createTime = ask3.createTime.Add(-30 * time.Second)
-       result3 = 
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 0}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, 
getNode)
+       result3 = 
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 0}), true, 30*time.Second, &maxAttemptsDecrease, iterator, iterator, getNode)
        assert.Assert(t, result3 != nil && result3.Request != nil && 
result3.ResultType == Reserved, "alloc3 should be a reservation")
        assert.Assert(t, alloc2.IsPreempted(), "alloc2 should have been 
preempted")
-       assert.Equal(t, preemptionAttemptsRemaining, 9)
+       assert.Equal(t, maxAttemptsDecrease, 9)
 }
 
 func TestTryAllocatePreemptNode(t *testing.T) {
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index db6b972a..71f30d6d 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -74,22 +74,24 @@ type Queue struct {
        // The queue properties should be treated as immutable the value is a 
merge of the
        // parent properties with the config for this queue only manipulated 
during creation
        // of the queue or via a queue configuration update.
-       properties                 map[string]string
-       adminACL                   security.ACL        // admin ACL
-       submitACL                  security.ACL        // submit ACL
-       maxResource                *resources.Resource // When not set, max = 
nil
-       guaranteedResource         *resources.Resource // When not set, 
Guaranteed == 0
-       isLeaf                     bool                // this is a leaf queue 
or not (i.e. parent)
-       isManaged                  bool                // queue is part of the 
config, not auto created
-       stateMachine               *fsm.FSM            // the state of the 
queue for scheduling
-       stateTime                  time.Time           // last time the state 
was updated (needed for cleanup)
-       maxRunningApps             uint64
-       runningApps                uint64
-       allocatingAcceptedApps     map[string]bool
-       template                   *template.Template
-       queueEvents                *schedEvt.QueueEvents
-       appQueueMapping            *AppQueueMapping // appID mapping to queues
-       quotaChangePreemptionDelay uint64
+       properties                         map[string]string
+       adminACL                           security.ACL        // admin ACL
+       submitACL                          security.ACL        // submit ACL
+       maxResource                        *resources.Resource // When not set, 
max = nil
+       guaranteedResource                 *resources.Resource // When not set, 
Guaranteed == 0
+       isLeaf                             bool                // this is a 
leaf queue or not (i.e. parent)
+       isManaged                          bool                // queue is part 
of the config, not auto created
+       stateMachine                       *fsm.FSM            // the state of 
the queue for scheduling
+       stateTime                          time.Time           // last time the 
state was updated (needed for cleanup)
+       maxRunningApps                     uint64
+       runningApps                        uint64
+       allocatingAcceptedApps             map[string]bool
+       template                           *template.Template
+       queueEvents                        *schedEvt.QueueEvents
+       appQueueMapping                    *AppQueueMapping // appID mapping to 
queues
+       quotaChangePreemptionDelay         uint64
+       hasTriggerredQuotaChangePreemption bool
+       isQuotaChangePreemptionRunning     bool
 
        locking.RWMutex
 }
@@ -2076,3 +2078,27 @@ func (sq *Queue) recalculatePriority() int32 {
        sq.currentPriority = curr
        return priorityValueByPolicy(sq.priorityPolicy, sq.priorityOffset, curr)
 }
+
+func (sq *Queue) MarkTriggerredQuotaChangePreemption() {
+       sq.Lock()
+       defer sq.Unlock()
+       sq.hasTriggerredQuotaChangePreemption = true
+}
+
+func (sq *Queue) HasTriggerredQuotaChangePreemption() bool {
+       sq.RLock()
+       defer sq.RUnlock()
+       return sq.hasTriggerredQuotaChangePreemption
+}
+
+func (sq *Queue) MarkQuotaChangePreemptionRunning() {
+       sq.Lock()
+       defer sq.Unlock()
+       sq.isQuotaChangePreemptionRunning = true
+}
+
+func (sq *Queue) IsQuotaChangePreemptionRunning() bool {
+       sq.RLock()
+       defer sq.RUnlock()
+       return sq.isQuotaChangePreemptionRunning
+}
diff --git a/pkg/scheduler/objects/quota_change_preemptor.go 
b/pkg/scheduler/objects/quota_change_preemptor.go
new file mode 100644
index 00000000..8634ba2b
--- /dev/null
+++ b/pkg/scheduler/objects/quota_change_preemptor.go
@@ -0,0 +1,50 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+type QuotaChangePreemptionContext struct {
+       queue *Queue
+}
+
+func NewQuotaChangePreemptor(queue *Queue) *QuotaChangePreemptionContext {
+       preemptor := &QuotaChangePreemptionContext{
+               queue: queue,
+       }
+       return preemptor
+}
+
+func (qcp *QuotaChangePreemptionContext) CheckPreconditions() bool {
+       if !qcp.queue.IsLeafQueue() || !qcp.queue.IsManaged() || 
qcp.queue.HasTriggerredQuotaChangePreemption() || 
qcp.queue.IsQuotaChangePreemptionRunning() {
+               return false
+       }
+       if 
qcp.queue.GetMaxResource().StrictlyGreaterThanOnlyExisting(qcp.queue.GetAllocatedResource())
 {
+               return false
+       }
+       return true
+}
+
+func (qcp *QuotaChangePreemptionContext) tryPreemption() {
+       // quota change preemption has started, so mark the flag
+       qcp.queue.MarkQuotaChangePreemptionRunning()
+
+       // Preemption logic goes here
+
+       // quota change preemption has really evicted victims, so mark the flag
+       qcp.queue.MarkTriggerredQuotaChangePreemption()
+}
diff --git a/pkg/scheduler/objects/quota_change_preemptor_test.go 
b/pkg/scheduler/objects/quota_change_preemptor_test.go
new file mode 100644
index 00000000..f7cd673d
--- /dev/null
+++ b/pkg/scheduler/objects/quota_change_preemptor_test.go
@@ -0,0 +1,101 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+       "testing"
+
+       "gotest.tools/v3/assert"
+
+       "github.com/apache/yunikorn-core/pkg/common/configs"
+       "github.com/apache/yunikorn-core/pkg/common/resources"
+)
+
+func TestQuotaChangeCheckPreconditions(t *testing.T) {
+       parentConfig := configs.QueueConfig{
+               Name:   "parent",
+               Parent: true,
+               Resources: configs.Resources{
+                       Max: map[string]string{"memory": "1000"},
+               },
+       }
+       parent, err := NewConfiguredQueue(parentConfig, nil, false, nil)
+       assert.NilError(t, err)
+
+       leafRes := configs.Resources{
+               Max: map[string]string{"memory": "1000"},
+       }
+       leaf, err := NewConfiguredQueue(configs.QueueConfig{
+               Name:      "leaf",
+               Resources: leafRes,
+       }, parent, false, nil)
+       assert.NilError(t, err)
+
+       dynamicLeaf, err := NewConfiguredQueue(configs.QueueConfig{
+               Name:      "dynamic-leaf",
+               Resources: leafRes,
+       }, parent, false, nil)
+       assert.NilError(t, err)
+       dynamicLeaf.isManaged = false
+
+       alreadyPreemptionRunning, err := NewConfiguredQueue(configs.QueueConfig{
+               Name:      "leaf-already-preemption-running",
+               Resources: leafRes,
+       }, parent, false, nil)
+       assert.NilError(t, err)
+       alreadyPreemptionRunning.MarkQuotaChangePreemptionRunning()
+
+       alreadyTriggeredPreemption, err := 
NewConfiguredQueue(configs.QueueConfig{
+               Name:      "leaf-already-triggerred-running",
+               Resources: leafRes,
+       }, parent, false, nil)
+       assert.NilError(t, err)
+       alreadyTriggeredPreemption.MarkTriggerredQuotaChangePreemption()
+
+       usageExceededMaxQueue, err := NewConfiguredQueue(configs.QueueConfig{
+               Name:      "leaf-usage-exceeded-max",
+               Resources: leafRes,
+       }, parent, false, nil)
+       assert.NilError(t, err)
+       usageExceededMaxQueue.allocatedResource = 
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 2000})
+
+       testCases := []struct {
+               name               string
+               queue              *Queue
+               preconditionResult bool
+       }{
+               {"parent queue", parent, false},
+               {"leaf queue", leaf, false},
+               {"dynamic leaf queue", dynamicLeaf, false},
+               {"leaf queue, already preemption process started or running", 
alreadyPreemptionRunning, false},
+               {"leaf queue, already triggerred preemption", 
alreadyTriggeredPreemption, false},
+               {"leaf queue, usage exceeded max resources", 
usageExceededMaxQueue, true},
+       }
+       for _, tc := range testCases {
+               t.Run(tc.name, func(t *testing.T) {
+                       preemptor := NewQuotaChangePreemptor(tc.queue)
+                       assert.Equal(t, preemptor.CheckPreconditions(), 
tc.preconditionResult)
+                       if tc.preconditionResult {
+                               preemptor.tryPreemption()
+                               assert.Equal(t, 
tc.queue.HasTriggerredQuotaChangePreemption(), true)
+                               assert.Equal(t, 
tc.queue.IsQuotaChangePreemptionRunning(), true)
+                       }
+               })
+       }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to