This is an automated email from the ASF dual-hosted git repository.
mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new a3a7ce54 [YUNIKORN-3149-1] Implement "best effort" approach for
preempt victims
a3a7ce54 is described below
commit a3a7ce54520ad8a727816b2752d0d85a38a523ce
Author: mani <[email protected]>
AuthorDate: Thu Nov 13 15:20:05 2025 +0530
[YUNIKORN-3149-1] Implement "best effort" approach for preempt victims
Closes: #1047
Signed-off-by: mani <[email protected]>
---
pkg/scheduler/objects/preemption_utilities_test.go | 9 ++-
pkg/scheduler/objects/quota_change_preemptor.go | 60 ++++++++++++-----
.../objects/quota_change_preemptor_test.go | 76 +++++++++++++---------
3 files changed, 100 insertions(+), 45 deletions(-)
diff --git a/pkg/scheduler/objects/preemption_utilities_test.go
b/pkg/scheduler/objects/preemption_utilities_test.go
index 013d3f13..fa3fed1a 100644
--- a/pkg/scheduler/objects/preemption_utilities_test.go
+++ b/pkg/scheduler/objects/preemption_utilities_test.go
@@ -149,11 +149,18 @@ func assignAllocationsToQueue(allocations []*Allocation,
queue *Queue) {
app = queue.applications[allocation.applicationID]
}
app.AddAllocation(allocation)
+ queue.IncAllocatedResource(allocation.GetAllocatedResource())
}
}
-func removeAllocationFromQueue(queue *Queue) {
+func resetQueue(queue *Queue) {
queue.applications = make(map[string]*Application)
+ queue.maxResource = nil
+ queue.allocatedResource = nil
+ queue.guaranteedResource = nil
+ queue.hasTriggerredQuotaChangePreemption = false
+ queue.isQuotaChangePreemptionRunning = false
+ queue.preemptingResource = nil
}
// regular pods
diff --git a/pkg/scheduler/objects/quota_change_preemptor.go
b/pkg/scheduler/objects/quota_change_preemptor.go
index 6868ef89..8e67309f 100644
--- a/pkg/scheduler/objects/quota_change_preemptor.go
+++ b/pkg/scheduler/objects/quota_change_preemptor.go
@@ -30,6 +30,9 @@ import (
type QuotaChangePreemptionContext struct {
queue *Queue
+ maxResource *resources.Resource
+ guaranteedResource *resources.Resource
+ allocatedResource *resources.Resource
preemptableResource *resources.Resource
allocations []*Allocation
}
@@ -37,9 +40,13 @@ type QuotaChangePreemptionContext struct {
func NewQuotaChangePreemptor(queue *Queue) *QuotaChangePreemptionContext {
preemptor := &QuotaChangePreemptionContext{
queue: queue,
+ maxResource: queue.CloneMaxResource(),
+ guaranteedResource: queue.GetGuaranteedResource().Clone(),
+ allocatedResource: queue.GetAllocatedResource().Clone(),
preemptableResource: nil,
allocations: make([]*Allocation, 0),
}
+
return preemptor
}
@@ -47,7 +54,7 @@ func (qcp *QuotaChangePreemptionContext) CheckPreconditions()
bool {
if !qcp.queue.IsLeafQueue() || !qcp.queue.IsManaged() ||
qcp.queue.HasTriggerredQuotaChangePreemption() ||
qcp.queue.IsQuotaChangePreemptionRunning() {
return false
}
- if
qcp.queue.GetMaxResource().StrictlyGreaterThanOnlyExisting(qcp.queue.GetAllocatedResource())
{
+ if
qcp.maxResource.StrictlyGreaterThanOnlyExisting(qcp.queue.GetAllocatedResource())
{
return false
}
return true
@@ -66,6 +73,9 @@ func (qcp *QuotaChangePreemptionContext) tryPreemption() {
// Sort the allocations
qcp.sortAllocations()
+ // Preempt the victims
+ qcp.preemptVictims()
+
// quota change preemption has really evicted victims, so mark the flag
qcp.queue.MarkTriggerredQuotaChangePreemption()
}
@@ -75,7 +85,7 @@ func (qcp *QuotaChangePreemptionContext) tryPreemption() {
// It could contain both positive and negative values. Only negative values
are preemptable.
func (qcp *QuotaChangePreemptionContext) getPreemptableResources()
*resources.Resource {
maxRes := qcp.queue.CloneMaxResource()
- used := resources.SubOnlyExisting(qcp.queue.GetAllocatedResource(),
qcp.queue.GetPreemptingResource())
+ used := resources.SubOnlyExisting(qcp.allocatedResource,
qcp.queue.GetPreemptingResource())
if maxRes.IsEmpty() || used.IsEmpty() {
return nil
}
@@ -134,12 +144,17 @@ func (qcp *QuotaChangePreemptionContext)
filterAllocations() []*Allocation {
return allocations
}
+// sortAllocations Sort the allocations running in the queue
func (qcp *QuotaChangePreemptionContext) sortAllocations() {
if len(qcp.allocations) > 0 {
SortAllocations(qcp.allocations)
}
}
+// preemptVictims Preempt the victims to enforce the new max resources.
+// When both max and guaranteed resources are set and equal, to comply with
law of preemption "Ensure usage doesn't go below guaranteed resources",
+// preempt victims on best effort basis. So, preempt victims as close as
possible to the required resource.
+// Otherwise, exceeding above the required resources slightly is acceptable
for now.
func (qcp *QuotaChangePreemptionContext) preemptVictims() {
if len(qcp.allocations) == 0 {
return
@@ -149,24 +164,39 @@ func (qcp *QuotaChangePreemptionContext) preemptVictims()
{
zap.Int("total victims", len(qcp.allocations)))
apps := make(map[*Application][]*Allocation)
victimsTotalResource := resources.NewResource()
- selectedVictimsTotalResource := resources.NewResource()
+ isGuaranteedAndMaxEquals := qcp.maxResource != nil &&
qcp.guaranteedResource != nil && resources.Equals(qcp.maxResource,
qcp.guaranteedResource)
+ log.Log(log.ShedQuotaChangePreemption).Info("Found victims for quota
change preemption",
+ zap.String("queue", qcp.queue.GetQueuePath()),
+ zap.Int("total victims", len(qcp.allocations)),
+ zap.String("max resources", qcp.maxResource.String()),
+ zap.String("guaranteed resources",
qcp.guaranteedResource.String()),
+ zap.String("allocated resources",
qcp.allocatedResource.String()),
+ zap.String("preemptable resources",
qcp.preemptableResource.String()),
+ zap.Bool("isGuaranteedSet", isGuaranteedAndMaxEquals),
+ )
for _, victim := range qcp.allocations {
- // stop collecting the victims once ask resource requirement met
+ if
!qcp.preemptableResource.FitInMaxUndef(victim.GetAllocatedResource()) {
+ continue
+ }
+ application := qcp.queue.GetApplication(victim.applicationID)
+
+ // Keep collecting the victims until preemptable resource
reaches and subtract the usage
if
qcp.preemptableResource.StrictlyGreaterThanOnlyExisting(victimsTotalResource) {
- application :=
qcp.queue.GetApplication(victim.applicationID)
- if _, ok := apps[application]; !ok {
- apps[application] = []*Allocation{}
- }
apps[application] = append(apps[application], victim)
-
selectedVictimsTotalResource.AddTo(victim.GetAllocatedResource())
+
qcp.allocatedResource.SubFrom(victim.GetAllocatedResource())
}
- victimsTotalResource.AddTo(victim.GetAllocatedResource())
- }
- if
qcp.preemptableResource.StrictlyGreaterThanOnlyExisting(victimsTotalResource) ||
-
selectedVictimsTotalResource.StrictlyGreaterThanOnlyExisting(qcp.preemptableResource)
{
- // either there is a shortfall or exceeding little above than
required, so try "best effort" approach later
- return
+ // Has usage gone below the guaranteed resources?
+ // If yes, revert the recently added victim steps completely
and try next victim.
+ if isGuaranteedAndMaxEquals &&
!qcp.allocatedResource.StrictlyGreaterThanOnlyExisting(qcp.guaranteedResource) {
+ victims := apps[application]
+ exceptRecentlyAddedVictims := victims[:len(victims)-1]
+ apps[application] = exceptRecentlyAddedVictims
+
qcp.allocatedResource.AddTo(victim.GetAllocatedResource())
+
victimsTotalResource.SubFrom(victim.GetAllocatedResource())
+ } else {
+
victimsTotalResource.AddTo(victim.GetAllocatedResource())
+ }
}
for app, victims := range apps {
diff --git a/pkg/scheduler/objects/quota_change_preemptor_test.go
b/pkg/scheduler/objects/quota_change_preemptor_test.go
index 328ca5c0..2be80120 100644
--- a/pkg/scheduler/objects/quota_change_preemptor_test.go
+++ b/pkg/scheduler/objects/quota_change_preemptor_test.go
@@ -20,6 +20,7 @@ package objects
import (
"testing"
+
"time"
"gotest.tools/v3/assert"
@@ -183,12 +184,12 @@ func TestQuotaChangeFilterVictims(t *testing.T) {
allocations := preemptor.filterAllocations()
assert.Equal(t, len(allocations),
tc.expectedAllocationsCount)
removeAllocationAsks(node, asks)
- removeAllocationFromQueue(leaf)
+ resetQueue(leaf)
})
}
}
-func TestQuotaChangePreemptVictims(t *testing.T) {
+func TestQuotaChangeTryPreemption(t *testing.T) {
leaf, err := NewConfiguredQueue(configs.QueueConfig{
Name: "leaf",
}, nil, false, nil)
@@ -202,52 +203,61 @@ func TestQuotaChangePreemptVictims(t *testing.T) {
},
})
- createTime := time.Now()
suitableVictims := make([]*Allocation, 0)
- notSuitableVictims := make([]*Allocation, 0)
+ oversizedVictims := make([]*Allocation, 0)
+ overflowVictims := make([]*Allocation, 0)
+ shortfallVictims := make([]*Allocation, 0)
- alloc1 := createAllocation("ask1", "app1", node.NodeID, true, false,
10, false,
-
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
- alloc1.createTime = createTime.Add(-time.Minute * 3)
- assert.Assert(t, node.TryAddAllocation(alloc1))
- suitableVictims = append(suitableVictims, alloc1)
- notSuitableVictims = append(notSuitableVictims, alloc1)
+ suitableVictims = append(suitableVictims, createVictim(t, "ask1", node,
5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})))
+ suitableVictims = append(suitableVictims, createVictim(t, "ask2", node,
4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})))
- alloc2 := createAllocation("ask2", "app2", node.NodeID, true, false,
10, false,
-
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
- alloc2.createTime = createTime.Add(-time.Minute * 2)
- assert.Assert(t, node.TryAddAllocation(alloc2))
- suitableVictims = append(suitableVictims, alloc2)
- notSuitableVictims = append(notSuitableVictims, alloc2)
+ oversizedVictims = append(oversizedVictims, createVictim(t, "ask21",
node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first":
9})))
+ oversizedVictims = append(oversizedVictims, createVictim(t, "ask3",
node, 3, resources.NewResourceFromMap(map[string]resources.Quantity{"first":
11})))
- alloc3 := createAllocation("ask3", "app3", node.NodeID, true, false,
10, false,
-
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50}))
- alloc3.createTime = createTime.Add(-time.Minute * 1)
- assert.Assert(t, node.TryAddAllocation(alloc2))
- notSuitableVictims = append(notSuitableVictims, alloc3)
+ overflowVictims = append(overflowVictims, createVictim(t, "ask4", node,
3, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})))
+ overflowVictims = append(overflowVictims, createVictim(t, "ask41",
node, 2, resources.NewResourceFromMap(map[string]resources.Quantity{"first":
6})))
+ overflowVictims = append(overflowVictims, createVictim(t, "ask42",
node, 1, resources.NewResourceFromMap(map[string]resources.Quantity{"first":
9})))
+ shortfallVictims = append(shortfallVictims, createVictim(t, "ask5",
node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first":
5})))
+ shortfallVictims = append(shortfallVictims, createVictim(t, "ask51",
node, 3, resources.NewResourceFromMap(map[string]resources.Quantity{"first":
6})))
+ shortfallVictims = append(shortfallVictims, createVictim(t, "ask52",
node, 2, resources.NewResourceFromMap(map[string]resources.Quantity{"first":
5})))
+ shortfallVictims = append(shortfallVictims, createVictim(t, "ask53",
node, 1, resources.NewResourceFromMap(map[string]resources.Quantity{"first":
4})))
+
+ oldMax :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 20})
+ newMax :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
+ preemptable := newMax
+ guaranteed := preemptable
+ lowerGuaranteed :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})
testCases := []struct {
name string
queue *Queue
+ oldMax *resources.Resource
+ newMax *resources.Resource
+ guaranteed *resources.Resource
preemptableResource *resources.Resource
victims []*Allocation
totalExpectedVictims int
expectedVictimsCount int
}{
- {"no victims available", leaf,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}),
[]*Allocation{}, 0, 0},
- {"suitable victims available", leaf,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}),
suitableVictims, 2, 2},
- {"not suitable victims available", leaf,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}),
notSuitableVictims, 3, 0},
+ {"no victims available", leaf, oldMax, newMax, nil,
preemptable, []*Allocation{}, 0, 0},
+ {"suitable victims available", leaf, oldMax, newMax, nil,
preemptable, suitableVictims, 2, 1},
+ {"skip over sized victims", leaf, oldMax, newMax, nil,
preemptable, oversizedVictims, 2, 1},
+ {"guaranteed not set, victims total resource might go over the
requirement a bit", leaf, oldMax, newMax, nil, preemptable, overflowVictims, 3,
2},
+ {"guaranteed set but lower than max, victims total resource
might go over the requirement a bit", leaf, oldMax, newMax, lowerGuaranteed,
preemptable, overflowVictims, 3, 2},
+ {"best effort - guaranteed set and equals max, victims total
resource might fall below the requirement a bit", leaf, oldMax, newMax,
guaranteed, preemptable, shortfallVictims, 4, 2},
+ {"best effort - guaranteed set, max not set earlier but now,
victims total resource might fall below the requirement a bit", leaf, nil,
newMax, guaranteed, preemptable, shortfallVictims, 4, 2},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
+ leaf.maxResource = tc.oldMax
+ leaf.guaranteedResource = tc.guaranteed
asks := tc.victims
assignAllocationsToQueue(asks, leaf)
+ leaf.maxResource = tc.newMax
+ leaf.guaranteedResource = tc.guaranteed
preemptor := NewQuotaChangePreemptor(tc.queue)
- preemptor.preemptableResource = tc.preemptableResource
preemptor.allocations = asks
- preemptor.filterAllocations()
- preemptor.sortAllocations()
- preemptor.preemptVictims()
+ preemptor.tryPreemption()
assert.Equal(t, len(preemptor.getVictims()),
tc.totalExpectedVictims)
var victimsCount int
for _, a := range asks {
@@ -257,7 +267,15 @@ func TestQuotaChangePreemptVictims(t *testing.T) {
}
assert.Equal(t, victimsCount, tc.expectedVictimsCount)
removeAllocationAsks(node, asks)
- removeAllocationFromQueue(leaf)
+ resetQueue(leaf)
})
}
}
+
+func createVictim(t *testing.T, allocKey string, node *Node, adjustment int,
allocRes *resources.Resource) *Allocation {
+ createTime := time.Now()
+ allocation := createAllocation(allocKey, "app1", node.NodeID, true,
false, 10, false, allocRes)
+ allocation.createTime = createTime.Add(-time.Minute *
time.Duration(adjustment))
+ assert.Assert(t, node.TryAddAllocation(allocation))
+ return allocation
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]