This is an automated email from the ASF dual-hosted git repository.

mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 138d67f4 [YUNIKORN-3150] Trigger preemption from scheduling cycle 
(#1048)
138d67f4 is described below

commit 138d67f4fc51c5ad2c4e12800194e1768cab0e7d
Author: mani <[email protected]>
AuthorDate: Mon Nov 17 11:18:04 2025 +0530

    [YUNIKORN-3150] Trigger preemption from scheduling cycle (#1048)
    
    Closes: #1048
    
    Signed-off-by: mani <[email protected]>
---
 pkg/common/configs/configvalidator.go              |   7 +-
 pkg/common/configs/configvalidator_test.go         |   2 +-
 pkg/common/resources/resources.go                  |  27 ++++-
 pkg/common/resources/resources_test.go             |  67 +++++++++++
 pkg/scheduler/objects/preemption_utilities_test.go |   1 -
 pkg/scheduler/objects/queue.go                     | 128 +++++++++++++--------
 pkg/scheduler/objects/queue_test.go                |  60 ++++++++--
 pkg/scheduler/objects/quota_change_preemptor.go    |  12 +-
 .../objects/quota_change_preemptor_test.go         |  15 +--
 9 files changed, 238 insertions(+), 81 deletions(-)

diff --git a/pkg/common/configs/configvalidator.go 
b/pkg/common/configs/configvalidator.go
index c311823b..06a6f49d 100644
--- a/pkg/common/configs/configvalidator.go
+++ b/pkg/common/configs/configvalidator.go
@@ -85,6 +85,9 @@ var SpecialRegExp = regexp.MustCompile(`[\^$*+?()\[{}|]`)
 // The rule maps to a go identifier check that regexp only
 var RuleNameRegExp = regexp.MustCompile(`^[_a-zA-Z][a-zA-Z0-9_]*$`)
 
+// Minimum Quota change preemption delay
+var minQuotaChangePreemptionDelay uint64 = 60
+
 type placementStaticPath struct {
        path           string
        ruleChain      string
@@ -662,8 +665,8 @@ func checkQueues(queue *QueueConfig, level int) error {
                        return fmt.Errorf("duplicate child name found with name 
'%s', level %d", child.Name, level)
                }
                queueMap[strings.ToLower(child.Name)] = true
-               if queue.Preemption.Delay != 0 && queue.Preemption.Delay <= 60 {
-                       return fmt.Errorf("invalid preemption delay %d, must be 
greater than 60 seconds", queue.Preemption.Delay)
+               if queue.Preemption.Delay != 0 && queue.Preemption.Delay <= 
minQuotaChangePreemptionDelay {
+                       return fmt.Errorf("invalid preemption delay %d, must be 
between %d and %d", queue.Preemption.Delay, minQuotaChangePreemptionDelay, 
uint64(math.MaxUint64))
                }
        }
 
diff --git a/pkg/common/configs/configvalidator_test.go 
b/pkg/common/configs/configvalidator_test.go
index e62f4b3f..9b014c22 100644
--- a/pkg/common/configs/configvalidator_test.go
+++ b/pkg/common/configs/configvalidator_test.go
@@ -2343,7 +2343,7 @@ func TestCheckQueues(t *testing.T) { //nolint:funlen
                                },
                        },
                        level:            1,
-                       expectedErrorMsg: "invalid preemption delay 10, must be 
greater than 60 seconds",
+                       expectedErrorMsg: "invalid preemption delay 10, must be 
between 60 and 18446744073709551615",
                },
                {
                        name: "Setting Preemption delay on root queue would be 
ignored",
diff --git a/pkg/common/resources/resources.go 
b/pkg/common/resources/resources.go
index 04a956c1..3c51f061 100644
--- a/pkg/common/resources/resources.go
+++ b/pkg/common/resources/resources.go
@@ -878,6 +878,21 @@ func StrictlyGreaterThanOrEquals(larger, smaller 
*Resource) bool {
 // Types defined in smaller that are not in the defined resource are ignored.
 // Two resources that are equal are not considered strictly larger than each 
other.
 func (r *Resource) StrictlyGreaterThanOnlyExisting(smaller *Resource) bool {
+       return r.internalStrictlyOnlyExisting(smaller, false)
+}
+
+// StrictlyGreaterThanOrEqualsOnlyExisting returns true if all quantities for 
types in the defined resource are greater than or equals
+// the quantity for the same type in smaller.
+// Types defined in smaller that are not in the defined resource are ignored.
+func (r *Resource) StrictlyGreaterThanOrEqualsOnlyExisting(smaller *Resource) 
bool {
+       return r.internalStrictlyOnlyExisting(smaller, true)
+}
+
+// internalStrictlyOnlyExisting When doEqualsCheck is false, it returns true 
if all quantities for types in the defined resource are greater than
+// the quantity for the same type in smaller. When doEqualsCheck is true, it 
returns true if all quantities for types in the defined resource are greater 
than
+// or equals the quantity for the same type in smaller.
+// Types defined in smaller that are not in the defined resource are ignored.
+func (r *Resource) internalStrictlyOnlyExisting(smaller *Resource, 
doEqualsCheck bool) bool {
        if r == nil {
                r = Zero
        }
@@ -888,6 +903,9 @@ func (r *Resource) StrictlyGreaterThanOnlyExisting(smaller 
*Resource) bool {
        // keep track of the number of not equal values
        notEqual := false
 
+       // keep track of the number of equal values
+       equal := false
+
        // Is larger and smaller completely disjoint?
        atleastOneResourcePresent := false
        // Is all resource in larger greater than zero?
@@ -905,6 +923,9 @@ func (r *Resource) StrictlyGreaterThanOnlyExisting(smaller 
*Resource) bool {
                        if val > v {
                                return false
                        }
+                       if val <= v {
+                               equal = true
+                       }
                        if val != v {
                                notEqual = true
                        }
@@ -915,7 +936,11 @@ func (r *Resource) StrictlyGreaterThanOnlyExisting(smaller 
*Resource) bool {
        case smaller.IsEmpty() && !r.IsEmpty():
                return isAllPositiveInLarger
        case atleastOneResourcePresent:
-               return notEqual
+               if doEqualsCheck {
+                       return equal
+               } else {
+                       return notEqual
+               }
        default:
                // larger and smaller is completely disjoint. none of the 
resource match.
                return !r.IsEmpty() && !smaller.IsEmpty()
diff --git a/pkg/common/resources/resources_test.go 
b/pkg/common/resources/resources_test.go
index d9766ea7..a5f20793 100644
--- a/pkg/common/resources/resources_test.go
+++ b/pkg/common/resources/resources_test.go
@@ -432,6 +432,73 @@ func TestStrictlyGreaterThanOnlyExisting(t *testing.T) {
        }
 }
 
+func TestStrictlyGreaterThanOrEqualsOnlyExisting(t *testing.T) {
+       type inputs struct {
+               larger  map[string]Quantity
+               smaller map[string]Quantity
+               sameRef bool
+       }
+       type outputs struct {
+               larger  bool
+               smaller bool
+       }
+       var tests = []struct {
+               caseName string
+               input    inputs
+               expected outputs
+       }{
+               {"Nil check", inputs{nil, nil, false}, outputs{false, false}},
+               {"Positive resource and empty resources", 
inputs{map[string]Quantity{"first": 10}, map[string]Quantity{}, false}, 
outputs{true, false}},
+               {"Positive resource and nil resources", 
inputs{map[string]Quantity{"first": 10}, nil, false}, outputs{true, false}},
+
+               {"Equal Positive resources", 
inputs{map[string]Quantity{"first": 10}, map[string]Quantity{"first": 10}, 
false}, outputs{true, true}},
+               {"Different Positive resources", 
inputs{map[string]Quantity{"first": 10}, map[string]Quantity{"first": 20}, 
false}, outputs{false, true}},
+               {"Different Positive resources", 
inputs{map[string]Quantity{"first": 10}, map[string]Quantity{"first": 5}, 
false}, outputs{true, false}},
+
+               {"Equal Positive resources with extra resource types", 
inputs{map[string]Quantity{"first": 10}, map[string]Quantity{"first": 10, 
"sec": 10}, false}, outputs{true, true}},
+               {"Different Positive resources with extra resource types", 
inputs{map[string]Quantity{"first": 10}, map[string]Quantity{"first": 20, 
"sec": 10}, false}, outputs{false, true}},
+               {"Different Positive resources with extra resource types", 
inputs{map[string]Quantity{"first": 10}, map[string]Quantity{"first": 5, "sec": 
10}, false}, outputs{true, false}},
+
+               {"Equal Positive resources but completely disjoint", 
inputs{map[string]Quantity{"first": 10}, map[string]Quantity{"sec": 10}, 
false}, outputs{true, true}},
+               {"Zero resource and empty resources", 
inputs{map[string]Quantity{"first": 0}, map[string]Quantity{}, false}, 
outputs{false, false}},
+               {"Zero resource and nil resources", 
inputs{map[string]Quantity{"first": 0}, nil, false}, outputs{false, false}},
+
+               {"Negative resource and empty resources", 
inputs{map[string]Quantity{"first": -10}, map[string]Quantity{}, false}, 
outputs{false, false}},
+               {"Negative resource and nil resources", 
inputs{map[string]Quantity{"first": -10}, nil, false}, outputs{false, false}},
+               {"Negative resource and empty resources", 
inputs{map[string]Quantity{"first": -10, "sec": 10}, map[string]Quantity{}, 
false}, outputs{false, false}},
+               {"Negative resource and nil resources", 
inputs{map[string]Quantity{"first": -10, "sec": 10}, nil, false}, 
outputs{false, false}},
+
+               {"Equal Negative resources", 
inputs{map[string]Quantity{"first": -10}, map[string]Quantity{"first": -10}, 
false}, outputs{true, true}},
+               {"Different Negative resources", 
inputs{map[string]Quantity{"first": -10}, map[string]Quantity{"first": -20}, 
false}, outputs{true, false}},
+               {"Different Negative resources", 
inputs{map[string]Quantity{"first": -10}, map[string]Quantity{"first": -5}, 
false}, outputs{false, true}},
+
+               {"Equal Negative resources with extra resource types", 
inputs{map[string]Quantity{"first": -10}, map[string]Quantity{"first": -10, 
"sec": 10}, false}, outputs{true, true}},
+               {"Different Negative resources with extra resource types", 
inputs{map[string]Quantity{"first": -10}, map[string]Quantity{"first": -20, 
"sec": 10}, false}, outputs{true, false}},
+               {"Different Negative resources with extra resource types", 
inputs{map[string]Quantity{"first": -10}, map[string]Quantity{"first": -5, 
"sec": 10}, false}, outputs{false, true}},
+
+               {"Equal Negative resources but completely disjoint", 
inputs{map[string]Quantity{"first": -10}, map[string]Quantity{"sec": -10}, 
false}, outputs{true, true}},
+       }
+       for _, tt := range tests {
+               t.Run(tt.caseName, func(t *testing.T) {
+                       var compare, base *Resource
+                       if tt.input.larger != nil {
+                               compare = NewResourceFromMap(tt.input.larger)
+                       }
+                       if tt.input.sameRef {
+                               base = compare
+                       } else {
+                               base = NewResourceFromMap(tt.input.smaller)
+                       }
+                       if result := 
compare.StrictlyGreaterThanOrEqualsOnlyExisting(base); result != 
tt.expected.larger {
+                               t.Errorf("comapre %v, base %v, got %v, 
expeceted %v", compare, base, result, tt.expected.larger)
+                       }
+                       if result := 
base.StrictlyGreaterThanOrEqualsOnlyExisting(compare); result != 
tt.expected.smaller {
+                               t.Errorf("base %v, compare %v, got %v, 
expeceted %v", base, compare, result, tt.expected.smaller)
+                       }
+               })
+       }
+}
+
 func TestStrictlyGreaterThanOrEquals(t *testing.T) {
        type inputs struct {
                larger  map[string]Quantity
diff --git a/pkg/scheduler/objects/preemption_utilities_test.go 
b/pkg/scheduler/objects/preemption_utilities_test.go
index fa3fed1a..0d30e79c 100644
--- a/pkg/scheduler/objects/preemption_utilities_test.go
+++ b/pkg/scheduler/objects/preemption_utilities_test.go
@@ -158,7 +158,6 @@ func resetQueue(queue *Queue) {
        queue.maxResource = nil
        queue.allocatedResource = nil
        queue.guaranteedResource = nil
-       queue.hasTriggerredQuotaChangePreemption = false
        queue.isQuotaChangePreemptionRunning = false
        queue.preemptingResource = nil
 }
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index f21ef498..e6b6813e 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -74,24 +74,24 @@ type Queue struct {
        // The queue properties should be treated as immutable the value is a 
merge of the
        // parent properties with the config for this queue only manipulated 
during creation
        // of the queue or via a queue configuration update.
-       properties                         map[string]string
-       adminACL                           security.ACL        // admin ACL
-       submitACL                          security.ACL        // submit ACL
-       maxResource                        *resources.Resource // When not set, 
max = nil
-       guaranteedResource                 *resources.Resource // When not set, 
Guaranteed == 0
-       isLeaf                             bool                // this is a 
leaf queue or not (i.e. parent)
-       isManaged                          bool                // queue is part 
of the config, not auto created
-       stateMachine                       *fsm.FSM            // the state of 
the queue for scheduling
-       stateTime                          time.Time           // last time the 
state was updated (needed for cleanup)
-       maxRunningApps                     uint64
-       runningApps                        uint64
-       allocatingAcceptedApps             map[string]bool
-       template                           *template.Template
-       queueEvents                        *schedEvt.QueueEvents
-       appQueueMapping                    *AppQueueMapping // appID mapping to 
queues
-       quotaChangePreemptionDelay         uint64
-       hasTriggerredQuotaChangePreemption bool
-       isQuotaChangePreemptionRunning     bool
+       properties                     map[string]string
+       adminACL                       security.ACL        // admin ACL
+       submitACL                      security.ACL        // submit ACL
+       maxResource                    *resources.Resource // When not set, max 
= nil
+       guaranteedResource             *resources.Resource // When not set, 
Guaranteed == 0
+       isLeaf                         bool                // this is a leaf 
queue or not (i.e. parent)
+       isManaged                      bool                // queue is part of 
the config, not auto created
+       stateMachine                   *fsm.FSM            // the state of the 
queue for scheduling
+       stateTime                      time.Time           // last time the 
state was updated (needed for cleanup)
+       maxRunningApps                 uint64
+       runningApps                    uint64
+       allocatingAcceptedApps         map[string]bool
+       template                       *template.Template
+       queueEvents                    *schedEvt.QueueEvents
+       appQueueMapping                *AppQueueMapping // appID mapping to 
queues
+       quotaChangePreemptionDelay     uint64
+       quotaChangePreemptionStartTime time.Time
+       isQuotaChangePreemptionRunning bool
 
        locking.RWMutex
 }
@@ -99,22 +99,23 @@ type Queue struct {
 // newBlankQueue creates a new empty queue objects with all values initialised.
 func newBlankQueue() *Queue {
        return &Queue{
-               children:                   make(map[string]*Queue),
-               childPriorities:            make(map[string]int32),
-               applications:               make(map[string]*Application),
-               appPriorities:              make(map[string]int32),
-               reservedApps:               make(map[string]int),
-               allocatingAcceptedApps:     make(map[string]bool),
-               properties:                 make(map[string]string),
-               stateMachine:               NewObjectState(),
-               allocatedResource:          resources.NewResource(),
-               preemptingResource:         resources.NewResource(),
-               pending:                    resources.NewResource(),
-               currentPriority:            configs.MinPriority,
-               prioritySortEnabled:        true,
-               preemptionDelay:            configs.DefaultPreemptionDelay,
-               preemptionPolicy:           policies.DefaultPreemptionPolicy,
-               quotaChangePreemptionDelay: 0,
+               children:                       make(map[string]*Queue),
+               childPriorities:                make(map[string]int32),
+               applications:                   make(map[string]*Application),
+               appPriorities:                  make(map[string]int32),
+               reservedApps:                   make(map[string]int),
+               allocatingAcceptedApps:         make(map[string]bool),
+               properties:                     make(map[string]string),
+               stateMachine:                   NewObjectState(),
+               allocatedResource:              resources.NewResource(),
+               preemptingResource:             resources.NewResource(),
+               pending:                        resources.NewResource(),
+               currentPriority:                configs.MinPriority,
+               prioritySortEnabled:            true,
+               preemptionDelay:                configs.DefaultPreemptionDelay,
+               preemptionPolicy:               
policies.DefaultPreemptionPolicy,
+               quotaChangePreemptionDelay:     0,
+               quotaChangePreemptionStartTime: time.Time{},
        }
 }
 
@@ -402,27 +403,54 @@ func (sq *Queue) setPreemptionSettings(oldMaxResource 
*resources.Resource, conf
        // Set max res earlier but not now
        case resources.IsZero(newMaxResource) && 
!resources.IsZero(oldMaxResource):
                sq.quotaChangePreemptionDelay = 0
+               sq.quotaChangePreemptionStartTime = time.Time{}
                // Set max res now but not earlier
        case !resources.IsZero(newMaxResource) && 
resources.IsZero(oldMaxResource) && conf.Preemption.Delay != 0:
                sq.quotaChangePreemptionDelay = conf.Preemption.Delay
+               sq.quotaChangePreemptionStartTime = 
time.Now().Add(time.Duration(int64(sq.quotaChangePreemptionDelay)) * 
time.Second) //nolint:gosec
                // Set max res earlier and now as well
        default:
                switch {
                // Quota decrease
                case resources.StrictlyGreaterThan(oldMaxResource, 
newMaxResource) && conf.Preemption.Delay != 0:
                        sq.quotaChangePreemptionDelay = conf.Preemption.Delay
+                       sq.quotaChangePreemptionStartTime = 
time.Now().Add(time.Duration(sq.quotaChangePreemptionDelay) * time.Second) 
//nolint:gosec
                        // Quota increase
                case resources.StrictlyGreaterThan(newMaxResource, 
oldMaxResource) && conf.Preemption.Delay != 0:
                        sq.quotaChangePreemptionDelay = 0
+                       sq.quotaChangePreemptionStartTime = time.Time{}
                        // Quota remains as is but delay has changed
                case resources.Equals(oldMaxResource, newMaxResource) && 
conf.Preemption.Delay != 0 && sq.quotaChangePreemptionDelay != 
conf.Preemption.Delay:
                        sq.quotaChangePreemptionDelay = conf.Preemption.Delay
+                       sq.quotaChangePreemptionStartTime = 
time.Now().Add(time.Duration(sq.quotaChangePreemptionDelay) * time.Second) 
//nolint:gosec
                default:
                        // noop
                }
        }
 }
 
+// resetPreemptionSettings Reset Quota change preemption settings
+func (sq *Queue) resetPreemptionSettings() {
+       sq.Lock()
+       defer sq.Unlock()
+       sq.quotaChangePreemptionDelay = 0
+       sq.quotaChangePreemptionStartTime = time.Time{}
+}
+
+// shouldTriggerPreemption Should preemption be triggered or not to enforce 
new max quota?
+func (sq *Queue) shouldTriggerPreemption() bool {
+       sq.RLock()
+       defer sq.RUnlock()
+       return sq.quotaChangePreemptionDelay != 0 && 
!sq.quotaChangePreemptionStartTime.IsZero() && 
time.Now().After(sq.quotaChangePreemptionStartTime)
+}
+
+// getPreemptionSettings Get preemption settings. Only for testing
+func (sq *Queue) getPreemptionSettings() (uint64, time.Time) {
+       sq.RLock()
+       defer sq.RUnlock()
+       return sq.quotaChangePreemptionDelay, sq.quotaChangePreemptionStartTime
+}
+
 // setResourcesFromConf sets the maxResource and guaranteedResource of the 
queue from the config.
 func (sq *Queue) setResourcesFromConf(resource configs.Resources) error {
        maxResource, err := resources.NewResourceFromConf(resource.Max)
@@ -1506,6 +1534,22 @@ func (sq *Queue) TryAllocate(iterator func() 
NodeIterator, fullIterator func() N
                                return result
                        }
                }
+
+               // Should we trigger preemption to enforce new quota?
+               if sq.shouldTriggerPreemption() {
+                       go func() {
+                               log.Log(log.SchedQueue).Info("Trigger 
preemption to enforce new max resources",
+                                       zap.String("queueName", sq.QueuePath),
+                                       zap.String("max resources", 
sq.maxResource.String()))
+                               preemptor := NewQuotaChangePreemptor(sq)
+                               if preemptor.CheckPreconditions() {
+                                       
log.Log(log.SchedQueue).Info("Preconditions has passed to trigger preemption to 
enforce new max resources",
+                                               zap.String("queueName", 
sq.QueuePath),
+                                               zap.String("max resources", 
sq.maxResource.String()))
+                                       preemptor.tryPreemption()
+                               }
+                       }()
+               }
        } else {
                // process the child queues (filters out queues without pending 
requests)
                for _, child := range sq.sortQueues() {
@@ -2085,22 +2129,10 @@ func (sq *Queue) recalculatePriority() int32 {
        return priorityValueByPolicy(sq.priorityPolicy, sq.priorityOffset, curr)
 }
 
-func (sq *Queue) MarkTriggerredQuotaChangePreemption() {
-       sq.Lock()
-       defer sq.Unlock()
-       sq.hasTriggerredQuotaChangePreemption = true
-}
-
-func (sq *Queue) HasTriggerredQuotaChangePreemption() bool {
-       sq.RLock()
-       defer sq.RUnlock()
-       return sq.hasTriggerredQuotaChangePreemption
-}
-
-func (sq *Queue) MarkQuotaChangePreemptionRunning() {
+func (sq *Queue) MarkQuotaChangePreemptionRunning(run bool) {
        sq.Lock()
        defer sq.Unlock()
-       sq.isQuotaChangePreemptionRunning = true
+       sq.isQuotaChangePreemptionRunning = run
 }
 
 func (sq *Queue) IsQuotaChangePreemptionRunning() bool {
diff --git a/pkg/scheduler/objects/queue_test.go 
b/pkg/scheduler/objects/queue_test.go
index 67d5a4c2..64f9fcb4 100644
--- a/pkg/scheduler/objects/queue_test.go
+++ b/pkg/scheduler/objects/queue_test.go
@@ -2274,9 +2274,9 @@ func TestQuotaChangePreemptionSettings(t *testing.T) {
                                Guaranteed: getResourceConf(),
                        },
                        Preemption: configs.Preemption{
-                               Delay: 100,
+                               Delay: 1,
                        },
-               }, 100},
+               }, 1},
                {"increase max with delay", configs.QueueConfig{
                        Resources: configs.Resources{
                                Max: map[string]string{"memory": "100000000"},
@@ -2290,26 +2290,26 @@ func TestQuotaChangePreemptionSettings(t *testing.T) {
                                Max: map[string]string{"memory": "100"},
                        },
                        Preemption: configs.Preemption{
-                               Delay: 500,
+                               Delay: 2,
                        },
-               }, 500},
+               }, 2},
                {"max remains as is but delay changed", configs.QueueConfig{
                        Resources: configs.Resources{
                                Max: map[string]string{"memory": "100"},
                        },
                        Preemption: configs.Preemption{
-                               Delay: 200,
+                               Delay: 2,
                        },
-               }, 200},
+               }, 2},
                {"unrelated config change, should not impact earlier set 
preemption settings", configs.QueueConfig{
                        Resources: configs.Resources{
                                Max:        map[string]string{"memory": "100"},
                                Guaranteed: map[string]string{"memory": "50"},
                        },
                        Preemption: configs.Preemption{
-                               Delay: 200,
+                               Delay: 2,
                        },
-               }, 200},
+               }, 2},
                {"increase max again with delay", configs.QueueConfig{
                        Resources: configs.Resources{
                                Max: map[string]string{"memory": "101"},
@@ -2323,7 +2323,49 @@ func TestQuotaChangePreemptionSettings(t *testing.T) {
                t.Run(tc.name, func(t *testing.T) {
                        err = parent.ApplyConf(tc.conf)
                        assert.NilError(t, err, "failed to apply conf: %v", err)
-                       assert.Equal(t, parent.quotaChangePreemptionDelay, 
tc.expectedDelay)
+
+                       // assert the preemption settings
+                       delay, sTime := parent.getPreemptionSettings()
+                       assert.Equal(t, delay, tc.expectedDelay)
+                       if tc.expectedDelay != uint64(0) {
+                               assert.Equal(t, sTime.IsZero(), false)
+                       } else {
+                               assert.Equal(t, sTime.IsZero(), true)
+                       }
+                       assert.Equal(t, parent.shouldTriggerPreemption(), false)
+
+                       used, err := 
resources.NewResourceFromConf(tc.conf.Resources.Max)
+                       assert.NilError(t, err, "failed to set allocated 
resource: %v", err)
+                       parent.allocatedResource = resources.Multiply(used, 2)
+
+                       // Wait till delay expires to let trigger preemption 
automatically
+                       time.Sleep(time.Duration(int64(tc.expectedDelay)+1) * 
time.Second)
+                       if tc.expectedDelay != uint64(0) {
+                               assert.Equal(t, 
parent.shouldTriggerPreemption(), true)
+                       }
+                       parent.TryAllocate(nil, nil, nil, false)
+
+                       // preemption settings should be same as before even 
now as trigger is async process
+                       delay, sTime = parent.getPreemptionSettings()
+                       assert.Equal(t, delay, tc.expectedDelay)
+                       if tc.expectedDelay != uint64(0) {
+                               assert.Equal(t, sTime.IsZero(), false)
+                               assert.Equal(t, 
parent.shouldTriggerPreemption(), true)
+                       } else {
+                               assert.Equal(t, sTime.IsZero(), true)
+                       }
+
+                       time.Sleep(time.Millisecond * 100)
+
+                       // preemption should have been triggered by now, assert 
preemption settings to ensure values are reset
+                       if tc.expectedDelay != uint64(0) {
+                               delay, sTime = parent.getPreemptionSettings()
+                               assert.Equal(t, sTime.IsZero(), true)
+                               assert.Equal(t, delay, uint64(0))
+
+                               // since preemption settings are set, 
preemption should not be triggerred again during tryAllocate
+                               assert.Equal(t, 
parent.shouldTriggerPreemption(), false)
+                       }
                })
        }
 }
diff --git a/pkg/scheduler/objects/quota_change_preemptor.go 
b/pkg/scheduler/objects/quota_change_preemptor.go
index 8e67309f..018247ed 100644
--- a/pkg/scheduler/objects/quota_change_preemptor.go
+++ b/pkg/scheduler/objects/quota_change_preemptor.go
@@ -51,7 +51,7 @@ func NewQuotaChangePreemptor(queue *Queue) 
*QuotaChangePreemptionContext {
 }
 
 func (qcp *QuotaChangePreemptionContext) CheckPreconditions() bool {
-       if !qcp.queue.IsLeafQueue() || !qcp.queue.IsManaged() || 
qcp.queue.HasTriggerredQuotaChangePreemption() || 
qcp.queue.IsQuotaChangePreemptionRunning() {
+       if !qcp.queue.IsLeafQueue() || !qcp.queue.IsManaged() || 
qcp.queue.IsQuotaChangePreemptionRunning() {
                return false
        }
        if 
qcp.maxResource.StrictlyGreaterThanOnlyExisting(qcp.queue.GetAllocatedResource())
 {
@@ -62,7 +62,7 @@ func (qcp *QuotaChangePreemptionContext) CheckPreconditions() 
bool {
 
 func (qcp *QuotaChangePreemptionContext) tryPreemption() {
        // quota change preemption has started, so mark the flag
-       qcp.queue.MarkQuotaChangePreemptionRunning()
+       qcp.queue.MarkQuotaChangePreemptionRunning(true)
 
        // Get Preemptable Resource
        qcp.preemptableResource = qcp.getPreemptableResources()
@@ -76,8 +76,11 @@ func (qcp *QuotaChangePreemptionContext) tryPreemption() {
        // Preempt the victims
        qcp.preemptVictims()
 
-       // quota change preemption has really evicted victims, so mark the flag
-       qcp.queue.MarkTriggerredQuotaChangePreemption()
+       // quota change preemption has ended, so mark the flag
+       qcp.queue.MarkQuotaChangePreemptionRunning(false)
+
+       // reset settings
+       qcp.queue.resetPreemptionSettings()
 }
 
 // getPreemptableResources Get the preemptable resources for the queue
@@ -201,7 +204,6 @@ func (qcp *QuotaChangePreemptionContext) preemptVictims() {
 
        for app, victims := range apps {
                if len(victims) > 0 {
-                       qcp.queue.MarkTriggerredQuotaChangePreemption()
                        for _, victim := range victims {
                                
log.Log(log.ShedQuotaChangePreemption).Info("Preempting victims for quota 
change preemption",
                                        zap.String("queue", 
qcp.queue.GetQueuePath()),
diff --git a/pkg/scheduler/objects/quota_change_preemptor_test.go 
b/pkg/scheduler/objects/quota_change_preemptor_test.go
index 2be80120..ec2f3e81 100644
--- a/pkg/scheduler/objects/quota_change_preemptor_test.go
+++ b/pkg/scheduler/objects/quota_change_preemptor_test.go
@@ -62,14 +62,7 @@ func TestQuotaChangeCheckPreconditions(t *testing.T) {
                Resources: leafRes,
        }, parent, false, nil)
        assert.NilError(t, err)
-       alreadyPreemptionRunning.MarkQuotaChangePreemptionRunning()
-
-       alreadyTriggeredPreemption, err := 
NewConfiguredQueue(configs.QueueConfig{
-               Name:      "leaf-already-triggerred-running",
-               Resources: leafRes,
-       }, parent, false, nil)
-       assert.NilError(t, err)
-       alreadyTriggeredPreemption.MarkTriggerredQuotaChangePreemption()
+       alreadyPreemptionRunning.MarkQuotaChangePreemptionRunning(true)
 
        usageExceededMaxQueue, err := NewConfiguredQueue(configs.QueueConfig{
                Name:      "leaf-usage-exceeded-max",
@@ -87,18 +80,12 @@ func TestQuotaChangeCheckPreconditions(t *testing.T) {
                {"leaf queue", leaf, false},
                {"dynamic leaf queue", dynamicLeaf, false},
                {"leaf queue, already preemption process started or running", 
alreadyPreemptionRunning, false},
-               {"leaf queue, already triggerred preemption", 
alreadyTriggeredPreemption, false},
                {"leaf queue, usage exceeded max resources", 
usageExceededMaxQueue, true},
        }
        for _, tc := range testCases {
                t.Run(tc.name, func(t *testing.T) {
                        preemptor := NewQuotaChangePreemptor(tc.queue)
                        assert.Equal(t, preemptor.CheckPreconditions(), 
tc.preconditionResult)
-                       if tc.preconditionResult {
-                               preemptor.tryPreemption()
-                               assert.Equal(t, 
tc.queue.HasTriggerredQuotaChangePreemption(), true)
-                               assert.Equal(t, 
tc.queue.IsQuotaChangePreemptionRunning(), true)
-                       }
                })
        }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to