This is an automated email from the ASF dual-hosted git repository.
mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 138d67f4 [YUNIKORN-3150] Trigger preemption from scheduling cycle
(#1048)
138d67f4 is described below
commit 138d67f4fc51c5ad2c4e12800194e1768cab0e7d
Author: mani <[email protected]>
AuthorDate: Mon Nov 17 11:18:04 2025 +0530
[YUNIKORN-3150] Trigger preemption from scheduling cycle (#1048)
Closes: #1048
Signed-off-by: mani <[email protected]>
---
pkg/common/configs/configvalidator.go | 7 +-
pkg/common/configs/configvalidator_test.go | 2 +-
pkg/common/resources/resources.go | 27 ++++-
pkg/common/resources/resources_test.go | 67 +++++++++++
pkg/scheduler/objects/preemption_utilities_test.go | 1 -
pkg/scheduler/objects/queue.go | 128 +++++++++++++--------
pkg/scheduler/objects/queue_test.go | 60 ++++++++--
pkg/scheduler/objects/quota_change_preemptor.go | 12 +-
.../objects/quota_change_preemptor_test.go | 15 +--
9 files changed, 238 insertions(+), 81 deletions(-)
diff --git a/pkg/common/configs/configvalidator.go
b/pkg/common/configs/configvalidator.go
index c311823b..06a6f49d 100644
--- a/pkg/common/configs/configvalidator.go
+++ b/pkg/common/configs/configvalidator.go
@@ -85,6 +85,9 @@ var SpecialRegExp = regexp.MustCompile(`[\^$*+?()\[{}|]`)
// The rule maps to a go identifier check that regexp only
var RuleNameRegExp = regexp.MustCompile(`^[_a-zA-Z][a-zA-Z0-9_]*$`)
+// Minimum Quota change preemption delay
+var minQuotaChangePreemptionDelay uint64 = 60
+
type placementStaticPath struct {
path string
ruleChain string
@@ -662,8 +665,8 @@ func checkQueues(queue *QueueConfig, level int) error {
return fmt.Errorf("duplicate child name found with name
'%s', level %d", child.Name, level)
}
queueMap[strings.ToLower(child.Name)] = true
- if queue.Preemption.Delay != 0 && queue.Preemption.Delay <= 60 {
- return fmt.Errorf("invalid preemption delay %d, must be
greater than 60 seconds", queue.Preemption.Delay)
+ if queue.Preemption.Delay != 0 && queue.Preemption.Delay <=
minQuotaChangePreemptionDelay {
+ return fmt.Errorf("invalid preemption delay %d, must be
between %d and %d", queue.Preemption.Delay, minQuotaChangePreemptionDelay,
uint64(math.MaxUint64))
}
}
diff --git a/pkg/common/configs/configvalidator_test.go
b/pkg/common/configs/configvalidator_test.go
index e62f4b3f..9b014c22 100644
--- a/pkg/common/configs/configvalidator_test.go
+++ b/pkg/common/configs/configvalidator_test.go
@@ -2343,7 +2343,7 @@ func TestCheckQueues(t *testing.T) { //nolint:funlen
},
},
level: 1,
- expectedErrorMsg: "invalid preemption delay 10, must be
greater than 60 seconds",
+ expectedErrorMsg: "invalid preemption delay 10, must be
between 60 and 18446744073709551615",
},
{
name: "Setting Preemption delay on root queue would be
ignored",
diff --git a/pkg/common/resources/resources.go
b/pkg/common/resources/resources.go
index 04a956c1..3c51f061 100644
--- a/pkg/common/resources/resources.go
+++ b/pkg/common/resources/resources.go
@@ -878,6 +878,21 @@ func StrictlyGreaterThanOrEquals(larger, smaller
*Resource) bool {
// Types defined in smaller that are not in the defined resource are ignored.
// Two resources that are equal are not considered strictly larger than each
other.
func (r *Resource) StrictlyGreaterThanOnlyExisting(smaller *Resource) bool {
+ return r.internalStrictlyOnlyExisting(smaller, false)
+}
+
+// StrictlyGreaterThanOrEqualsOnlyExisting returns true if all quantities for
types in the defined resource are greater than or equals
+// the quantity for the same type in smaller.
+// Types defined in smaller that are not in the defined resource are ignored.
+func (r *Resource) StrictlyGreaterThanOrEqualsOnlyExisting(smaller *Resource)
bool {
+ return r.internalStrictlyOnlyExisting(smaller, true)
+}
+
+// internalStrictlyOnlyExisting When doEqualsCheck is false, it returns true
if all quantities for types in the defined resource are greater than
+// the quantity for the same type in smaller. When doEqualsCheck is true, it
returns true if all quantities for types in the defined resource are greater
than
+// or equals the quantity for the same type in smaller.
+// Types defined in smaller that are not in the defined resource are ignored.
+func (r *Resource) internalStrictlyOnlyExisting(smaller *Resource,
doEqualsCheck bool) bool {
if r == nil {
r = Zero
}
@@ -888,6 +903,9 @@ func (r *Resource) StrictlyGreaterThanOnlyExisting(smaller
*Resource) bool {
// keep track of the number of not equal values
notEqual := false
+ // keep track of the number of equal values
+ equal := false
+
// Is larger and smaller completely disjoint?
atleastOneResourcePresent := false
// Is all resource in larger greater than zero?
@@ -905,6 +923,9 @@ func (r *Resource) StrictlyGreaterThanOnlyExisting(smaller
*Resource) bool {
if val > v {
return false
}
+ if val <= v {
+ equal = true
+ }
if val != v {
notEqual = true
}
@@ -915,7 +936,11 @@ func (r *Resource) StrictlyGreaterThanOnlyExisting(smaller
*Resource) bool {
case smaller.IsEmpty() && !r.IsEmpty():
return isAllPositiveInLarger
case atleastOneResourcePresent:
- return notEqual
+ if doEqualsCheck {
+ return equal
+ } else {
+ return notEqual
+ }
default:
// larger and smaller is completely disjoint. none of the
resource match.
return !r.IsEmpty() && !smaller.IsEmpty()
diff --git a/pkg/common/resources/resources_test.go
b/pkg/common/resources/resources_test.go
index d9766ea7..a5f20793 100644
--- a/pkg/common/resources/resources_test.go
+++ b/pkg/common/resources/resources_test.go
@@ -432,6 +432,73 @@ func TestStrictlyGreaterThanOnlyExisting(t *testing.T) {
}
}
+func TestStrictlyGreaterThanOrEqualsOnlyExisting(t *testing.T) {
+ type inputs struct {
+ larger map[string]Quantity
+ smaller map[string]Quantity
+ sameRef bool
+ }
+ type outputs struct {
+ larger bool
+ smaller bool
+ }
+ var tests = []struct {
+ caseName string
+ input inputs
+ expected outputs
+ }{
+ {"Nil check", inputs{nil, nil, false}, outputs{false, false}},
+ {"Positive resource and empty resources",
inputs{map[string]Quantity{"first": 10}, map[string]Quantity{}, false},
outputs{true, false}},
+ {"Positive resource and nil resources",
inputs{map[string]Quantity{"first": 10}, nil, false}, outputs{true, false}},
+
+ {"Equal Positive resources",
inputs{map[string]Quantity{"first": 10}, map[string]Quantity{"first": 10},
false}, outputs{true, true}},
+ {"Different Positive resources",
inputs{map[string]Quantity{"first": 10}, map[string]Quantity{"first": 20},
false}, outputs{false, true}},
+ {"Different Positive resources",
inputs{map[string]Quantity{"first": 10}, map[string]Quantity{"first": 5},
false}, outputs{true, false}},
+
+ {"Equal Positive resources with extra resource types",
inputs{map[string]Quantity{"first": 10}, map[string]Quantity{"first": 10,
"sec": 10}, false}, outputs{true, true}},
+ {"Different Positive resources with extra resource types",
inputs{map[string]Quantity{"first": 10}, map[string]Quantity{"first": 20,
"sec": 10}, false}, outputs{false, true}},
+ {"Different Positive resources with extra resource types",
inputs{map[string]Quantity{"first": 10}, map[string]Quantity{"first": 5, "sec":
10}, false}, outputs{true, false}},
+
+ {"Equal Positive resources but completely disjoint",
inputs{map[string]Quantity{"first": 10}, map[string]Quantity{"sec": 10},
false}, outputs{true, true}},
+ {"Zero resource and empty resources",
inputs{map[string]Quantity{"first": 0}, map[string]Quantity{}, false},
outputs{false, false}},
+ {"Zero resource and nil resources",
inputs{map[string]Quantity{"first": 0}, nil, false}, outputs{false, false}},
+
+ {"Negative resource and empty resources",
inputs{map[string]Quantity{"first": -10}, map[string]Quantity{}, false},
outputs{false, false}},
+ {"Negative resource and nil resources",
inputs{map[string]Quantity{"first": -10}, nil, false}, outputs{false, false}},
+ {"Negative resource and empty resources",
inputs{map[string]Quantity{"first": -10, "sec": 10}, map[string]Quantity{},
false}, outputs{false, false}},
+ {"Negative resource and nil resources",
inputs{map[string]Quantity{"first": -10, "sec": 10}, nil, false},
outputs{false, false}},
+
+ {"Equal Negative resources",
inputs{map[string]Quantity{"first": -10}, map[string]Quantity{"first": -10},
false}, outputs{true, true}},
+ {"Different Negative resources",
inputs{map[string]Quantity{"first": -10}, map[string]Quantity{"first": -20},
false}, outputs{true, false}},
+ {"Different Negative resources",
inputs{map[string]Quantity{"first": -10}, map[string]Quantity{"first": -5},
false}, outputs{false, true}},
+
+ {"Equal Negative resources with extra resource types",
inputs{map[string]Quantity{"first": -10}, map[string]Quantity{"first": -10,
"sec": 10}, false}, outputs{true, true}},
+ {"Different Negative resources with extra resource types",
inputs{map[string]Quantity{"first": -10}, map[string]Quantity{"first": -20,
"sec": 10}, false}, outputs{true, false}},
+ {"Different Negative resources with extra resource types",
inputs{map[string]Quantity{"first": -10}, map[string]Quantity{"first": -5,
"sec": 10}, false}, outputs{false, true}},
+
+ {"Equal Negative resources but completely disjoint",
inputs{map[string]Quantity{"first": -10}, map[string]Quantity{"sec": -10},
false}, outputs{true, true}},
+ }
+ for _, tt := range tests {
+ t.Run(tt.caseName, func(t *testing.T) {
+ var compare, base *Resource
+ if tt.input.larger != nil {
+ compare = NewResourceFromMap(tt.input.larger)
+ }
+ if tt.input.sameRef {
+ base = compare
+ } else {
+ base = NewResourceFromMap(tt.input.smaller)
+ }
+ if result :=
compare.StrictlyGreaterThanOrEqualsOnlyExisting(base); result !=
tt.expected.larger {
+ t.Errorf("comapre %v, base %v, got %v,
expeceted %v", compare, base, result, tt.expected.larger)
+ }
+ if result :=
base.StrictlyGreaterThanOrEqualsOnlyExisting(compare); result !=
tt.expected.smaller {
+ t.Errorf("base %v, compare %v, got %v,
expeceted %v", base, compare, result, tt.expected.smaller)
+ }
+ })
+ }
+}
+
func TestStrictlyGreaterThanOrEquals(t *testing.T) {
type inputs struct {
larger map[string]Quantity
diff --git a/pkg/scheduler/objects/preemption_utilities_test.go
b/pkg/scheduler/objects/preemption_utilities_test.go
index fa3fed1a..0d30e79c 100644
--- a/pkg/scheduler/objects/preemption_utilities_test.go
+++ b/pkg/scheduler/objects/preemption_utilities_test.go
@@ -158,7 +158,6 @@ func resetQueue(queue *Queue) {
queue.maxResource = nil
queue.allocatedResource = nil
queue.guaranteedResource = nil
- queue.hasTriggerredQuotaChangePreemption = false
queue.isQuotaChangePreemptionRunning = false
queue.preemptingResource = nil
}
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index f21ef498..e6b6813e 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -74,24 +74,24 @@ type Queue struct {
// The queue properties should be treated as immutable the value is a
merge of the
// parent properties with the config for this queue only manipulated
during creation
// of the queue or via a queue configuration update.
- properties map[string]string
- adminACL security.ACL // admin ACL
- submitACL security.ACL // submit ACL
- maxResource *resources.Resource // When not set,
max = nil
- guaranteedResource *resources.Resource // When not set,
Guaranteed == 0
- isLeaf bool // this is a
leaf queue or not (i.e. parent)
- isManaged bool // queue is part
of the config, not auto created
- stateMachine *fsm.FSM // the state of
the queue for scheduling
- stateTime time.Time // last time the
state was updated (needed for cleanup)
- maxRunningApps uint64
- runningApps uint64
- allocatingAcceptedApps map[string]bool
- template *template.Template
- queueEvents *schedEvt.QueueEvents
- appQueueMapping *AppQueueMapping // appID mapping to
queues
- quotaChangePreemptionDelay uint64
- hasTriggerredQuotaChangePreemption bool
- isQuotaChangePreemptionRunning bool
+ properties map[string]string
+ adminACL security.ACL // admin ACL
+ submitACL security.ACL // submit ACL
+ maxResource *resources.Resource // When not set, max
= nil
+ guaranteedResource *resources.Resource // When not set,
Guaranteed == 0
+ isLeaf bool // this is a leaf
queue or not (i.e. parent)
+ isManaged bool // queue is part of
the config, not auto created
+ stateMachine *fsm.FSM // the state of the
queue for scheduling
+ stateTime time.Time // last time the
state was updated (needed for cleanup)
+ maxRunningApps uint64
+ runningApps uint64
+ allocatingAcceptedApps map[string]bool
+ template *template.Template
+ queueEvents *schedEvt.QueueEvents
+ appQueueMapping *AppQueueMapping // appID mapping to
queues
+ quotaChangePreemptionDelay uint64
+ quotaChangePreemptionStartTime time.Time
+ isQuotaChangePreemptionRunning bool
locking.RWMutex
}
@@ -99,22 +99,23 @@ type Queue struct {
// newBlankQueue creates a new empty queue objects with all values initialised.
func newBlankQueue() *Queue {
return &Queue{
- children: make(map[string]*Queue),
- childPriorities: make(map[string]int32),
- applications: make(map[string]*Application),
- appPriorities: make(map[string]int32),
- reservedApps: make(map[string]int),
- allocatingAcceptedApps: make(map[string]bool),
- properties: make(map[string]string),
- stateMachine: NewObjectState(),
- allocatedResource: resources.NewResource(),
- preemptingResource: resources.NewResource(),
- pending: resources.NewResource(),
- currentPriority: configs.MinPriority,
- prioritySortEnabled: true,
- preemptionDelay: configs.DefaultPreemptionDelay,
- preemptionPolicy: policies.DefaultPreemptionPolicy,
- quotaChangePreemptionDelay: 0,
+ children: make(map[string]*Queue),
+ childPriorities: make(map[string]int32),
+ applications: make(map[string]*Application),
+ appPriorities: make(map[string]int32),
+ reservedApps: make(map[string]int),
+ allocatingAcceptedApps: make(map[string]bool),
+ properties: make(map[string]string),
+ stateMachine: NewObjectState(),
+ allocatedResource: resources.NewResource(),
+ preemptingResource: resources.NewResource(),
+ pending: resources.NewResource(),
+ currentPriority: configs.MinPriority,
+ prioritySortEnabled: true,
+ preemptionDelay: configs.DefaultPreemptionDelay,
+ preemptionPolicy:
policies.DefaultPreemptionPolicy,
+ quotaChangePreemptionDelay: 0,
+ quotaChangePreemptionStartTime: time.Time{},
}
}
@@ -402,27 +403,54 @@ func (sq *Queue) setPreemptionSettings(oldMaxResource
*resources.Resource, conf
// Set max res earlier but not now
case resources.IsZero(newMaxResource) &&
!resources.IsZero(oldMaxResource):
sq.quotaChangePreemptionDelay = 0
+ sq.quotaChangePreemptionStartTime = time.Time{}
// Set max res now but not earlier
case !resources.IsZero(newMaxResource) &&
resources.IsZero(oldMaxResource) && conf.Preemption.Delay != 0:
sq.quotaChangePreemptionDelay = conf.Preemption.Delay
+ sq.quotaChangePreemptionStartTime =
time.Now().Add(time.Duration(int64(sq.quotaChangePreemptionDelay)) *
time.Second) //nolint:gosec
// Set max res earlier and now as well
default:
switch {
// Quota decrease
case resources.StrictlyGreaterThan(oldMaxResource,
newMaxResource) && conf.Preemption.Delay != 0:
sq.quotaChangePreemptionDelay = conf.Preemption.Delay
+ sq.quotaChangePreemptionStartTime =
time.Now().Add(time.Duration(sq.quotaChangePreemptionDelay) * time.Second)
//nolint:gosec
// Quota increase
case resources.StrictlyGreaterThan(newMaxResource,
oldMaxResource) && conf.Preemption.Delay != 0:
sq.quotaChangePreemptionDelay = 0
+ sq.quotaChangePreemptionStartTime = time.Time{}
// Quota remains as is but delay has changed
case resources.Equals(oldMaxResource, newMaxResource) &&
conf.Preemption.Delay != 0 && sq.quotaChangePreemptionDelay !=
conf.Preemption.Delay:
sq.quotaChangePreemptionDelay = conf.Preemption.Delay
+ sq.quotaChangePreemptionStartTime =
time.Now().Add(time.Duration(sq.quotaChangePreemptionDelay) * time.Second)
//nolint:gosec
default:
// noop
}
}
}
+// resetPreemptionSettings Reset Quota change preemption settings
+func (sq *Queue) resetPreemptionSettings() {
+ sq.Lock()
+ defer sq.Unlock()
+ sq.quotaChangePreemptionDelay = 0
+ sq.quotaChangePreemptionStartTime = time.Time{}
+}
+
+// shouldTriggerPreemption Should preemption be triggered or not to enforce
new max quota?
+func (sq *Queue) shouldTriggerPreemption() bool {
+ sq.RLock()
+ defer sq.RUnlock()
+ return sq.quotaChangePreemptionDelay != 0 &&
!sq.quotaChangePreemptionStartTime.IsZero() &&
time.Now().After(sq.quotaChangePreemptionStartTime)
+}
+
+// getPreemptionSettings Get preemption settings. Only for testing
+func (sq *Queue) getPreemptionSettings() (uint64, time.Time) {
+ sq.RLock()
+ defer sq.RUnlock()
+ return sq.quotaChangePreemptionDelay, sq.quotaChangePreemptionStartTime
+}
+
// setResourcesFromConf sets the maxResource and guaranteedResource of the
queue from the config.
func (sq *Queue) setResourcesFromConf(resource configs.Resources) error {
maxResource, err := resources.NewResourceFromConf(resource.Max)
@@ -1506,6 +1534,22 @@ func (sq *Queue) TryAllocate(iterator func()
NodeIterator, fullIterator func() N
return result
}
}
+
+ // Should we trigger preemption to enforce new quota?
+ if sq.shouldTriggerPreemption() {
+ go func() {
+ log.Log(log.SchedQueue).Info("Trigger
preemption to enforce new max resources",
+ zap.String("queueName", sq.QueuePath),
+ zap.String("max resources",
sq.maxResource.String()))
+ preemptor := NewQuotaChangePreemptor(sq)
+ if preemptor.CheckPreconditions() {
+
log.Log(log.SchedQueue).Info("Preconditions has passed to trigger preemption to
enforce new max resources",
+ zap.String("queueName",
sq.QueuePath),
+ zap.String("max resources",
sq.maxResource.String()))
+ preemptor.tryPreemption()
+ }
+ }()
+ }
} else {
// process the child queues (filters out queues without pending
requests)
for _, child := range sq.sortQueues() {
@@ -2085,22 +2129,10 @@ func (sq *Queue) recalculatePriority() int32 {
return priorityValueByPolicy(sq.priorityPolicy, sq.priorityOffset, curr)
}
-func (sq *Queue) MarkTriggerredQuotaChangePreemption() {
- sq.Lock()
- defer sq.Unlock()
- sq.hasTriggerredQuotaChangePreemption = true
-}
-
-func (sq *Queue) HasTriggerredQuotaChangePreemption() bool {
- sq.RLock()
- defer sq.RUnlock()
- return sq.hasTriggerredQuotaChangePreemption
-}
-
-func (sq *Queue) MarkQuotaChangePreemptionRunning() {
+func (sq *Queue) MarkQuotaChangePreemptionRunning(run bool) {
sq.Lock()
defer sq.Unlock()
- sq.isQuotaChangePreemptionRunning = true
+ sq.isQuotaChangePreemptionRunning = run
}
func (sq *Queue) IsQuotaChangePreemptionRunning() bool {
diff --git a/pkg/scheduler/objects/queue_test.go
b/pkg/scheduler/objects/queue_test.go
index 67d5a4c2..64f9fcb4 100644
--- a/pkg/scheduler/objects/queue_test.go
+++ b/pkg/scheduler/objects/queue_test.go
@@ -2274,9 +2274,9 @@ func TestQuotaChangePreemptionSettings(t *testing.T) {
Guaranteed: getResourceConf(),
},
Preemption: configs.Preemption{
- Delay: 100,
+ Delay: 1,
},
- }, 100},
+ }, 1},
{"increase max with delay", configs.QueueConfig{
Resources: configs.Resources{
Max: map[string]string{"memory": "100000000"},
@@ -2290,26 +2290,26 @@ func TestQuotaChangePreemptionSettings(t *testing.T) {
Max: map[string]string{"memory": "100"},
},
Preemption: configs.Preemption{
- Delay: 500,
+ Delay: 2,
},
- }, 500},
+ }, 2},
{"max remains as is but delay changed", configs.QueueConfig{
Resources: configs.Resources{
Max: map[string]string{"memory": "100"},
},
Preemption: configs.Preemption{
- Delay: 200,
+ Delay: 2,
},
- }, 200},
+ }, 2},
{"unrelated config change, should not impact earlier set
preemption settings", configs.QueueConfig{
Resources: configs.Resources{
Max: map[string]string{"memory": "100"},
Guaranteed: map[string]string{"memory": "50"},
},
Preemption: configs.Preemption{
- Delay: 200,
+ Delay: 2,
},
- }, 200},
+ }, 2},
{"increase max again with delay", configs.QueueConfig{
Resources: configs.Resources{
Max: map[string]string{"memory": "101"},
@@ -2323,7 +2323,49 @@ func TestQuotaChangePreemptionSettings(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
err = parent.ApplyConf(tc.conf)
assert.NilError(t, err, "failed to apply conf: %v", err)
- assert.Equal(t, parent.quotaChangePreemptionDelay,
tc.expectedDelay)
+
+ // assert the preemption settings
+ delay, sTime := parent.getPreemptionSettings()
+ assert.Equal(t, delay, tc.expectedDelay)
+ if tc.expectedDelay != uint64(0) {
+ assert.Equal(t, sTime.IsZero(), false)
+ } else {
+ assert.Equal(t, sTime.IsZero(), true)
+ }
+ assert.Equal(t, parent.shouldTriggerPreemption(), false)
+
+ used, err :=
resources.NewResourceFromConf(tc.conf.Resources.Max)
+ assert.NilError(t, err, "failed to set allocated
resource: %v", err)
+ parent.allocatedResource = resources.Multiply(used, 2)
+
+ // Wait till delay expires to let trigger preemption
automatically
+ time.Sleep(time.Duration(int64(tc.expectedDelay)+1) *
time.Second)
+ if tc.expectedDelay != uint64(0) {
+ assert.Equal(t,
parent.shouldTriggerPreemption(), true)
+ }
+ parent.TryAllocate(nil, nil, nil, false)
+
+ // preemption settings should be same as before even
now as trigger is async process
+ delay, sTime = parent.getPreemptionSettings()
+ assert.Equal(t, delay, tc.expectedDelay)
+ if tc.expectedDelay != uint64(0) {
+ assert.Equal(t, sTime.IsZero(), false)
+ assert.Equal(t,
parent.shouldTriggerPreemption(), true)
+ } else {
+ assert.Equal(t, sTime.IsZero(), true)
+ }
+
+ time.Sleep(time.Millisecond * 100)
+
+ // preemption should have been triggered by now, assert
preemption settings to ensure values are reset
+ if tc.expectedDelay != uint64(0) {
+ delay, sTime = parent.getPreemptionSettings()
+ assert.Equal(t, sTime.IsZero(), true)
+ assert.Equal(t, delay, uint64(0))
+
+ // since preemption settings are set,
preemption should not be triggerred again during tryAllocate
+ assert.Equal(t,
parent.shouldTriggerPreemption(), false)
+ }
})
}
}
diff --git a/pkg/scheduler/objects/quota_change_preemptor.go
b/pkg/scheduler/objects/quota_change_preemptor.go
index 8e67309f..018247ed 100644
--- a/pkg/scheduler/objects/quota_change_preemptor.go
+++ b/pkg/scheduler/objects/quota_change_preemptor.go
@@ -51,7 +51,7 @@ func NewQuotaChangePreemptor(queue *Queue)
*QuotaChangePreemptionContext {
}
func (qcp *QuotaChangePreemptionContext) CheckPreconditions() bool {
- if !qcp.queue.IsLeafQueue() || !qcp.queue.IsManaged() ||
qcp.queue.HasTriggerredQuotaChangePreemption() ||
qcp.queue.IsQuotaChangePreemptionRunning() {
+ if !qcp.queue.IsLeafQueue() || !qcp.queue.IsManaged() ||
qcp.queue.IsQuotaChangePreemptionRunning() {
return false
}
if
qcp.maxResource.StrictlyGreaterThanOnlyExisting(qcp.queue.GetAllocatedResource())
{
@@ -62,7 +62,7 @@ func (qcp *QuotaChangePreemptionContext) CheckPreconditions()
bool {
func (qcp *QuotaChangePreemptionContext) tryPreemption() {
// quota change preemption has started, so mark the flag
- qcp.queue.MarkQuotaChangePreemptionRunning()
+ qcp.queue.MarkQuotaChangePreemptionRunning(true)
// Get Preemptable Resource
qcp.preemptableResource = qcp.getPreemptableResources()
@@ -76,8 +76,11 @@ func (qcp *QuotaChangePreemptionContext) tryPreemption() {
// Preempt the victims
qcp.preemptVictims()
- // quota change preemption has really evicted victims, so mark the flag
- qcp.queue.MarkTriggerredQuotaChangePreemption()
+ // quota change preemption has ended, so mark the flag
+ qcp.queue.MarkQuotaChangePreemptionRunning(false)
+
+ // reset settings
+ qcp.queue.resetPreemptionSettings()
}
// getPreemptableResources Get the preemptable resources for the queue
@@ -201,7 +204,6 @@ func (qcp *QuotaChangePreemptionContext) preemptVictims() {
for app, victims := range apps {
if len(victims) > 0 {
- qcp.queue.MarkTriggerredQuotaChangePreemption()
for _, victim := range victims {
log.Log(log.ShedQuotaChangePreemption).Info("Preempting victims for quota
change preemption",
zap.String("queue",
qcp.queue.GetQueuePath()),
diff --git a/pkg/scheduler/objects/quota_change_preemptor_test.go
b/pkg/scheduler/objects/quota_change_preemptor_test.go
index 2be80120..ec2f3e81 100644
--- a/pkg/scheduler/objects/quota_change_preemptor_test.go
+++ b/pkg/scheduler/objects/quota_change_preemptor_test.go
@@ -62,14 +62,7 @@ func TestQuotaChangeCheckPreconditions(t *testing.T) {
Resources: leafRes,
}, parent, false, nil)
assert.NilError(t, err)
- alreadyPreemptionRunning.MarkQuotaChangePreemptionRunning()
-
- alreadyTriggeredPreemption, err :=
NewConfiguredQueue(configs.QueueConfig{
- Name: "leaf-already-triggerred-running",
- Resources: leafRes,
- }, parent, false, nil)
- assert.NilError(t, err)
- alreadyTriggeredPreemption.MarkTriggerredQuotaChangePreemption()
+ alreadyPreemptionRunning.MarkQuotaChangePreemptionRunning(true)
usageExceededMaxQueue, err := NewConfiguredQueue(configs.QueueConfig{
Name: "leaf-usage-exceeded-max",
@@ -87,18 +80,12 @@ func TestQuotaChangeCheckPreconditions(t *testing.T) {
{"leaf queue", leaf, false},
{"dynamic leaf queue", dynamicLeaf, false},
{"leaf queue, already preemption process started or running",
alreadyPreemptionRunning, false},
- {"leaf queue, already triggerred preemption",
alreadyTriggeredPreemption, false},
{"leaf queue, usage exceeded max resources",
usageExceededMaxQueue, true},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
preemptor := NewQuotaChangePreemptor(tc.queue)
assert.Equal(t, preemptor.CheckPreconditions(),
tc.preconditionResult)
- if tc.preconditionResult {
- preemptor.tryPreemption()
- assert.Equal(t,
tc.queue.HasTriggerredQuotaChangePreemption(), true)
- assert.Equal(t,
tc.queue.IsQuotaChangePreemptionRunning(), true)
- }
})
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]