This is an automated email from the ASF dual-hosted git repository.

mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new a3a7ce54 [YUNIKORN-3149-1] Implement "best effort" approach for 
preempt victims
a3a7ce54 is described below

commit a3a7ce54520ad8a727816b2752d0d85a38a523ce
Author: mani <[email protected]>
AuthorDate: Thu Nov 13 15:20:05 2025 +0530

    [YUNIKORN-3149-1] Implement "best effort" approach for preempt victims
    
    Closes: #1047
    
    Signed-off-by: mani <[email protected]>
---
 pkg/scheduler/objects/preemption_utilities_test.go |  9 ++-
 pkg/scheduler/objects/quota_change_preemptor.go    | 60 ++++++++++++-----
 .../objects/quota_change_preemptor_test.go         | 76 +++++++++++++---------
 3 files changed, 100 insertions(+), 45 deletions(-)

diff --git a/pkg/scheduler/objects/preemption_utilities_test.go 
b/pkg/scheduler/objects/preemption_utilities_test.go
index 013d3f13..fa3fed1a 100644
--- a/pkg/scheduler/objects/preemption_utilities_test.go
+++ b/pkg/scheduler/objects/preemption_utilities_test.go
@@ -149,11 +149,18 @@ func assignAllocationsToQueue(allocations []*Allocation, 
queue *Queue) {
                        app = queue.applications[allocation.applicationID]
                }
                app.AddAllocation(allocation)
+               queue.IncAllocatedResource(allocation.GetAllocatedResource())
        }
 }
 
-func removeAllocationFromQueue(queue *Queue) {
+func resetQueue(queue *Queue) {
        queue.applications = make(map[string]*Application)
+       queue.maxResource = nil
+       queue.allocatedResource = nil
+       queue.guaranteedResource = nil
+       queue.hasTriggerredQuotaChangePreemption = false
+       queue.isQuotaChangePreemptionRunning = false
+       queue.preemptingResource = nil
 }
 
 // regular pods
diff --git a/pkg/scheduler/objects/quota_change_preemptor.go 
b/pkg/scheduler/objects/quota_change_preemptor.go
index 6868ef89..8e67309f 100644
--- a/pkg/scheduler/objects/quota_change_preemptor.go
+++ b/pkg/scheduler/objects/quota_change_preemptor.go
@@ -30,6 +30,9 @@ import (
 
 type QuotaChangePreemptionContext struct {
        queue               *Queue
+       maxResource         *resources.Resource
+       guaranteedResource  *resources.Resource
+       allocatedResource   *resources.Resource
        preemptableResource *resources.Resource
        allocations         []*Allocation
 }
@@ -37,9 +40,13 @@ type QuotaChangePreemptionContext struct {
 func NewQuotaChangePreemptor(queue *Queue) *QuotaChangePreemptionContext {
        preemptor := &QuotaChangePreemptionContext{
                queue:               queue,
+               maxResource:         queue.CloneMaxResource(),
+               guaranteedResource:  queue.GetGuaranteedResource().Clone(),
+               allocatedResource:   queue.GetAllocatedResource().Clone(),
                preemptableResource: nil,
                allocations:         make([]*Allocation, 0),
        }
+
        return preemptor
 }
 
@@ -47,7 +54,7 @@ func (qcp *QuotaChangePreemptionContext) CheckPreconditions() 
bool {
        if !qcp.queue.IsLeafQueue() || !qcp.queue.IsManaged() || 
qcp.queue.HasTriggerredQuotaChangePreemption() || 
qcp.queue.IsQuotaChangePreemptionRunning() {
                return false
        }
-       if 
qcp.queue.GetMaxResource().StrictlyGreaterThanOnlyExisting(qcp.queue.GetAllocatedResource())
 {
+       if 
qcp.maxResource.StrictlyGreaterThanOnlyExisting(qcp.queue.GetAllocatedResource())
 {
                return false
        }
        return true
@@ -66,6 +73,9 @@ func (qcp *QuotaChangePreemptionContext) tryPreemption() {
        // Sort the allocations
        qcp.sortAllocations()
 
+       // Preempt the victims
+       qcp.preemptVictims()
+
        // quota change preemption has really evicted victims, so mark the flag
        qcp.queue.MarkTriggerredQuotaChangePreemption()
 }
@@ -75,7 +85,7 @@ func (qcp *QuotaChangePreemptionContext) tryPreemption() {
 // It could contain both positive and negative values. Only negative values 
are preemptable.
 func (qcp *QuotaChangePreemptionContext) getPreemptableResources() 
*resources.Resource {
        maxRes := qcp.queue.CloneMaxResource()
-       used := resources.SubOnlyExisting(qcp.queue.GetAllocatedResource(), 
qcp.queue.GetPreemptingResource())
+       used := resources.SubOnlyExisting(qcp.allocatedResource, 
qcp.queue.GetPreemptingResource())
        if maxRes.IsEmpty() || used.IsEmpty() {
                return nil
        }
@@ -134,12 +144,17 @@ func (qcp *QuotaChangePreemptionContext) 
filterAllocations() []*Allocation {
        return allocations
 }
 
+// sortAllocations Sort the allocations running in the queue
 func (qcp *QuotaChangePreemptionContext) sortAllocations() {
        if len(qcp.allocations) > 0 {
                SortAllocations(qcp.allocations)
        }
 }
 
+// preemptVictims Preempt the victims to enforce the new max resources.
+// When both max and guaranteed resources are set and equal, to comply with 
law of preemption "Ensure usage doesn't go below guaranteed resources",
+// preempt victims on best effort basis. So, preempt victims as close as 
possible to the required resource.
+// Otherwise, exceeding above the required resources slightly is acceptable 
for now.
 func (qcp *QuotaChangePreemptionContext) preemptVictims() {
        if len(qcp.allocations) == 0 {
                return
@@ -149,24 +164,39 @@ func (qcp *QuotaChangePreemptionContext) preemptVictims() 
{
                zap.Int("total victims", len(qcp.allocations)))
        apps := make(map[*Application][]*Allocation)
        victimsTotalResource := resources.NewResource()
-       selectedVictimsTotalResource := resources.NewResource()
+       isGuaranteedAndMaxEquals := qcp.maxResource != nil && 
qcp.guaranteedResource != nil && resources.Equals(qcp.maxResource, 
qcp.guaranteedResource)
+       log.Log(log.ShedQuotaChangePreemption).Info("Found victims for quota 
change preemption",
+               zap.String("queue", qcp.queue.GetQueuePath()),
+               zap.Int("total victims", len(qcp.allocations)),
+               zap.String("max resources", qcp.maxResource.String()),
+               zap.String("guaranteed resources", 
qcp.guaranteedResource.String()),
+               zap.String("allocated resources", 
qcp.allocatedResource.String()),
+               zap.String("preemptable resources", 
qcp.preemptableResource.String()),
+               zap.Bool("isGuaranteedSet", isGuaranteedAndMaxEquals),
+       )
        for _, victim := range qcp.allocations {
-               // stop collecting the victims once ask resource requirement met
+               if 
!qcp.preemptableResource.FitInMaxUndef(victim.GetAllocatedResource()) {
+                       continue
+               }
+               application := qcp.queue.GetApplication(victim.applicationID)
+
+               // Keep collecting the victims until preemptable resource 
reaches and subtract the usage
                if 
qcp.preemptableResource.StrictlyGreaterThanOnlyExisting(victimsTotalResource) {
-                       application := 
qcp.queue.GetApplication(victim.applicationID)
-                       if _, ok := apps[application]; !ok {
-                               apps[application] = []*Allocation{}
-                       }
                        apps[application] = append(apps[application], victim)
-                       
selectedVictimsTotalResource.AddTo(victim.GetAllocatedResource())
+                       
qcp.allocatedResource.SubFrom(victim.GetAllocatedResource())
                }
-               victimsTotalResource.AddTo(victim.GetAllocatedResource())
-       }
 
-       if 
qcp.preemptableResource.StrictlyGreaterThanOnlyExisting(victimsTotalResource) ||
-               
selectedVictimsTotalResource.StrictlyGreaterThanOnlyExisting(qcp.preemptableResource)
 {
-               // either there is a shortfall or exceeding little above than 
required, so try "best effort" approach later
-               return
+               // Has usage gone below the guaranteed resources?
+               // If yes, revert the recently added victim steps completely 
and try next victim.
+               if isGuaranteedAndMaxEquals && 
!qcp.allocatedResource.StrictlyGreaterThanOnlyExisting(qcp.guaranteedResource) {
+                       victims := apps[application]
+                       exceptRecentlyAddedVictims := victims[:len(victims)-1]
+                       apps[application] = exceptRecentlyAddedVictims
+                       
qcp.allocatedResource.AddTo(victim.GetAllocatedResource())
+                       
victimsTotalResource.SubFrom(victim.GetAllocatedResource())
+               } else {
+                       
victimsTotalResource.AddTo(victim.GetAllocatedResource())
+               }
        }
 
        for app, victims := range apps {
diff --git a/pkg/scheduler/objects/quota_change_preemptor_test.go 
b/pkg/scheduler/objects/quota_change_preemptor_test.go
index 328ca5c0..2be80120 100644
--- a/pkg/scheduler/objects/quota_change_preemptor_test.go
+++ b/pkg/scheduler/objects/quota_change_preemptor_test.go
@@ -20,6 +20,7 @@ package objects
 
 import (
        "testing"
+
        "time"
 
        "gotest.tools/v3/assert"
@@ -183,12 +184,12 @@ func TestQuotaChangeFilterVictims(t *testing.T) {
                        allocations := preemptor.filterAllocations()
                        assert.Equal(t, len(allocations), 
tc.expectedAllocationsCount)
                        removeAllocationAsks(node, asks)
-                       removeAllocationFromQueue(leaf)
+                       resetQueue(leaf)
                })
        }
 }
 
-func TestQuotaChangePreemptVictims(t *testing.T) {
+func TestQuotaChangeTryPreemption(t *testing.T) {
        leaf, err := NewConfiguredQueue(configs.QueueConfig{
                Name: "leaf",
        }, nil, false, nil)
@@ -202,52 +203,61 @@ func TestQuotaChangePreemptVictims(t *testing.T) {
                },
        })
 
-       createTime := time.Now()
        suitableVictims := make([]*Allocation, 0)
-       notSuitableVictims := make([]*Allocation, 0)
+       oversizedVictims := make([]*Allocation, 0)
+       overflowVictims := make([]*Allocation, 0)
+       shortfallVictims := make([]*Allocation, 0)
 
-       alloc1 := createAllocation("ask1", "app1", node.NodeID, true, false, 
10, false,
-               
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
-       alloc1.createTime = createTime.Add(-time.Minute * 3)
-       assert.Assert(t, node.TryAddAllocation(alloc1))
-       suitableVictims = append(suitableVictims, alloc1)
-       notSuitableVictims = append(notSuitableVictims, alloc1)
+       suitableVictims = append(suitableVictims, createVictim(t, "ask1", node, 
5, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})))
+       suitableVictims = append(suitableVictims, createVictim(t, "ask2", node, 
4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})))
 
-       alloc2 := createAllocation("ask2", "app2", node.NodeID, true, false, 
10, false,
-               
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
-       alloc2.createTime = createTime.Add(-time.Minute * 2)
-       assert.Assert(t, node.TryAddAllocation(alloc2))
-       suitableVictims = append(suitableVictims, alloc2)
-       notSuitableVictims = append(notSuitableVictims, alloc2)
+       oversizedVictims = append(oversizedVictims, createVictim(t, "ask21", 
node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 
9})))
+       oversizedVictims = append(oversizedVictims, createVictim(t, "ask3", 
node, 3, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 
11})))
 
-       alloc3 := createAllocation("ask3", "app3", node.NodeID, true, false, 
10, false,
-               
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50}))
-       alloc3.createTime = createTime.Add(-time.Minute * 1)
-       assert.Assert(t, node.TryAddAllocation(alloc2))
-       notSuitableVictims = append(notSuitableVictims, alloc3)
+       overflowVictims = append(overflowVictims, createVictim(t, "ask4", node, 
3, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})))
+       overflowVictims = append(overflowVictims, createVictim(t, "ask41", 
node, 2, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 
6})))
+       overflowVictims = append(overflowVictims, createVictim(t, "ask42", 
node, 1, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 
9})))
 
+       shortfallVictims = append(shortfallVictims, createVictim(t, "ask5", 
node, 4, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 
5})))
+       shortfallVictims = append(shortfallVictims, createVictim(t, "ask51", 
node, 3, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 
6})))
+       shortfallVictims = append(shortfallVictims, createVictim(t, "ask52", 
node, 2, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 
5})))
+       shortfallVictims = append(shortfallVictims, createVictim(t, "ask53", 
node, 1, resources.NewResourceFromMap(map[string]resources.Quantity{"first": 
4})))
+
+       oldMax := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 20})
+       newMax := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
+       preemptable := newMax
+       guaranteed := preemptable
+       lowerGuaranteed := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5})
        testCases := []struct {
                name                 string
                queue                *Queue
+               oldMax               *resources.Resource
+               newMax               *resources.Resource
+               guaranteed           *resources.Resource
                preemptableResource  *resources.Resource
                victims              []*Allocation
                totalExpectedVictims int
                expectedVictimsCount int
        }{
-               {"no victims available", leaf, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), 
[]*Allocation{}, 0, 0},
-               {"suitable victims available", leaf, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), 
suitableVictims, 2, 2},
-               {"not suitable victims available", leaf, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), 
notSuitableVictims, 3, 0},
+               {"no victims available", leaf, oldMax, newMax, nil, 
preemptable, []*Allocation{}, 0, 0},
+               {"suitable victims available", leaf, oldMax, newMax, nil, 
preemptable, suitableVictims, 2, 1},
+               {"skip over sized victims", leaf, oldMax, newMax, nil, 
preemptable, oversizedVictims, 2, 1},
+               {"guaranteed not set, victims total resource might go over the 
requirement a bit", leaf, oldMax, newMax, nil, preemptable, overflowVictims, 3, 
2},
+               {"guaranteed set but lower than max, victims total resource 
might go over the requirement a bit", leaf, oldMax, newMax, lowerGuaranteed, 
preemptable, overflowVictims, 3, 2},
+               {"best effort - guaranteed set and equals max, victims total 
resource might fall below the requirement a bit", leaf, oldMax, newMax, 
guaranteed, preemptable, shortfallVictims, 4, 2},
+               {"best effort - guaranteed set, max not set earlier but now, 
victims total resource might fall below the requirement a bit", leaf, nil, 
newMax, guaranteed, preemptable, shortfallVictims, 4, 2},
        }
        for _, tc := range testCases {
                t.Run(tc.name, func(t *testing.T) {
+                       leaf.maxResource = tc.oldMax
+                       leaf.guaranteedResource = tc.guaranteed
                        asks := tc.victims
                        assignAllocationsToQueue(asks, leaf)
+                       leaf.maxResource = tc.newMax
+                       leaf.guaranteedResource = tc.guaranteed
                        preemptor := NewQuotaChangePreemptor(tc.queue)
-                       preemptor.preemptableResource = tc.preemptableResource
                        preemptor.allocations = asks
-                       preemptor.filterAllocations()
-                       preemptor.sortAllocations()
-                       preemptor.preemptVictims()
+                       preemptor.tryPreemption()
                        assert.Equal(t, len(preemptor.getVictims()), 
tc.totalExpectedVictims)
                        var victimsCount int
                        for _, a := range asks {
@@ -257,7 +267,15 @@ func TestQuotaChangePreemptVictims(t *testing.T) {
                        }
                        assert.Equal(t, victimsCount, tc.expectedVictimsCount)
                        removeAllocationAsks(node, asks)
-                       removeAllocationFromQueue(leaf)
+                       resetQueue(leaf)
                })
        }
 }
+
+func createVictim(t *testing.T, allocKey string, node *Node, adjustment int, 
allocRes *resources.Resource) *Allocation {
+       createTime := time.Now()
+       allocation := createAllocation(allocKey, "app1", node.NodeID, true, 
false, 10, false, allocRes)
+       allocation.createTime = createTime.Add(-time.Minute * 
time.Duration(adjustment))
+       assert.Assert(t, node.TryAddAllocation(allocation))
+       return allocation
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to