This is an automated email from the ASF dual-hosted git repository.

mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new c5a6a970 [YUNIKORN-3146] Preempt victims (#1044)
c5a6a970 is described below

commit c5a6a970cfb40b6eee90db58c83517ca13a36266
Author: mani <[email protected]>
AuthorDate: Tue Nov 11 20:34:34 2025 +0530

    [YUNIKORN-3146] Preempt victims (#1044)
    
    Closes: #1044
    
    Signed-off-by: mani <[email protected]>
---
 pkg/scheduler/objects/allocation.go                |  5 ++
 pkg/scheduler/objects/application_test.go          |  7 +-
 pkg/scheduler/objects/events/ask_events.go         |  9 +++
 pkg/scheduler/objects/events/ask_events_test.go    | 20 ++++++
 pkg/scheduler/objects/preemption_test.go           | 13 ++--
 pkg/scheduler/objects/preemption_utilities_test.go |  1 +
 pkg/scheduler/objects/quota_change_preemptor.go    | 56 ++++++++++++++++
 .../objects/quota_change_preemptor_test.go         | 75 ++++++++++++++++++++++
 pkg/scheduler/objects/utilities_test.go            | 18 ++----
 9 files changed, 181 insertions(+), 23 deletions(-)

diff --git a/pkg/scheduler/objects/allocation.go 
b/pkg/scheduler/objects/allocation.go
index 2bcd7c2b..3b60c80b 100644
--- a/pkg/scheduler/objects/allocation.go
+++ b/pkg/scheduler/objects/allocation.go
@@ -481,6 +481,11 @@ func (a *Allocation) 
SendPreemptedBySchedulerEvent(preemptorAllocKey, preemptorA
        a.askEvents.SendPreemptedByScheduler(a.allocationKey, a.applicationID, 
preemptorAllocKey, preemptorAppId, preemptorQueuePath, a.GetAllocatedResource())
 }
 
+// SendPreemptedByQuotaChangeEvent updates the event system with the Quota 
change preemption event.
+func (a *Allocation) SendPreemptedByQuotaChangeEvent(queuePath string) {
+       a.askEvents.SendPreemptedByQuotaChange(a.allocationKey, 
a.applicationID, queuePath, a.GetAllocatedResource())
+}
+
 // GetAllocationLog returns a list of log entries corresponding to allocation 
preconditions not being met.
 func (a *Allocation) GetAllocationLog() []*AllocationLogEntry {
        a.RLock()
diff --git a/pkg/scheduler/objects/application_test.go 
b/pkg/scheduler/objects/application_test.go
index e25597ad..53688323 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -2242,8 +2242,7 @@ func TestTryAllocatePreemptQueue(t *testing.T) {
        result3 := 
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 0}), true, 30*time.Second, &maxAttemptsExhausted, iterator, iterator, getNode)
        assert.Assert(t, result3 == nil, "result3 not expected")
        assert.Assert(t, !alloc2.IsPreempted(), "alloc2 should not have been 
preempted")
-       log := ask3.GetAllocationLog()
-       assert.Equal(t, log[0].Message, common.PreemptionMaxAttemptsExhausted)
+       assertAllocationLog(t, ask3, 
[]string{common.PreemptionMaxAttemptsExhausted, common.PreemptionDoesNotHelp})
        assert.Equal(t, maxAttemptsExhausted, 0)
 
        maxAttemptsDecrease := 10
@@ -2252,7 +2251,7 @@ func TestTryAllocatePreemptQueue(t *testing.T) {
        result3 = 
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 0}), true, 30*time.Second, &maxAttemptsDecrease, iterator, iterator, getNode)
        assert.Assert(t, result3 == nil, "result3 not expected")
        assert.Assert(t, !alloc2.IsPreempted(), "alloc2 should not have been 
preempted")
-       assertAllocationLog(t, ask3)
+       assertAllocationLog(t, ask3, 
[]string{common.PreemptionPreconditionsFailed, common.PreemptionDoesNotHelp})
        assert.Equal(t, maxAttemptsDecrease, 10)
 
        // pass the time and try again
@@ -2271,7 +2270,7 @@ func TestTryAllocatePreemptNode(t *testing.T) {
        result3 := 
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, 
getNode)
        assert.Assert(t, result3 == nil, "result3 expected")
        assert.Assert(t, !allocs[1].IsPreempted(), "alloc1 should have been 
preempted")
-       assertAllocationLog(t, ask3)
+       assertAllocationLog(t, ask3, 
[]string{common.PreemptionPreconditionsFailed, common.PreemptionDoesNotHelp})
 
        // pass the time and try again
        ask3.createTime = ask3.createTime.Add(-30 * time.Second)
diff --git a/pkg/scheduler/objects/events/ask_events.go 
b/pkg/scheduler/objects/events/ask_events.go
index db657339..24542094 100644
--- a/pkg/scheduler/objects/events/ask_events.go
+++ b/pkg/scheduler/objects/events/ask_events.go
@@ -114,6 +114,15 @@ func (ae *AskEvents) SendPreemptedByScheduler(allocKey, 
appID, preemptorAllocKey
        ae.eventSystem.AddEvent(event)
 }
 
+func (ae *AskEvents) SendPreemptedByQuotaChange(allocKey, appID, queuePath 
string, allocatedResource *resources.Resource) {
+       if !ae.eventSystem.IsEventTrackingEnabled() {
+               return
+       }
+       message := fmt.Sprintf("Preempted by Quota change enforcement process 
in %s", queuePath)
+       event := events.CreateRequestEventRecord(allocKey, appID, message, 
allocatedResource)
+       ae.eventSystem.AddEvent(event)
+}
+
 func NewAskEvents(evt events.EventSystem) *AskEvents {
        return newAskEventsWithRate(evt, 15*time.Second, 1)
 }
diff --git a/pkg/scheduler/objects/events/ask_events_test.go 
b/pkg/scheduler/objects/events/ask_events_test.go
index d6680600..e0a755a9 100644
--- a/pkg/scheduler/objects/events/ask_events_test.go
+++ b/pkg/scheduler/objects/events/ask_events_test.go
@@ -198,3 +198,23 @@ func TestPreemptedBySchedulerEvents(t *testing.T) {
        assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
        assert.Equal(t, "Preempted by preemptor-0 from application 
preemptor-app-0 in root.parent.child1", event.Message)
 }
+
+func TestSendPreemptedByQuotaChange(t *testing.T) {
+       resource := 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
+       eventSystem := mock.NewEventSystemDisabled()
+       events := NewAskEvents(eventSystem)
+       events.SendPreemptedByQuotaChange("alloc-0", appID, 
"root.parent.child1", resource)
+       assert.Equal(t, 0, len(eventSystem.Events))
+
+       eventSystem = mock.NewEventSystem()
+       events = NewAskEvents(eventSystem)
+       events.SendPreemptedByQuotaChange("alloc-0", appID, 
"root.parent.child1", resource)
+       assert.Equal(t, 1, len(eventSystem.Events))
+       event := eventSystem.Events[0]
+       assert.Equal(t, "alloc-0", event.ObjectID)
+       assert.Equal(t, appID, event.ReferenceID)
+       assert.Equal(t, si.EventRecord_REQUEST, event.Type)
+       assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
+       assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
+       assert.Equal(t, "Preempted by Quota change enforcement process in 
root.parent.child1", event.Message)
+}
diff --git a/pkg/scheduler/objects/preemption_test.go 
b/pkg/scheduler/objects/preemption_test.go
index 4cbf939a..c09884cb 100644
--- a/pkg/scheduler/objects/preemption_test.go
+++ b/pkg/scheduler/objects/preemption_test.go
@@ -168,7 +168,7 @@ func TestCheckPreconditions(t *testing.T) {
        preemptionAttemptsRemaining := 1
        result := 
app.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 2}), true, 1*time.Second, &preemptionAttemptsRemaining, iterator, iterator, 
getNode)
        assert.Check(t, result == nil, "unexpected result")
-       assertAllocationLog(t, ask)
+       assertAllocationLog(t, ask, 
[]string{common.PreemptionPreconditionsFailed, common.PreemptionDoesNotHelp})
        ask.preemptCheckTime = time.Now().Add(-1 * time.Minute)
        assert.Assert(t, preemptor.CheckPreconditions(), "preconditions failed")
        assert.Assert(t, !preemptor.CheckPreconditions(), "preconditions 
succeeded on successive run")
@@ -255,7 +255,7 @@ func 
TestCheckPreemptionQueueGuaranteesWithNoGuaranteedResources(t *testing.T) {
                        } else {
                                assert.Equal(t, result == nil, true, 
"unexpected resultType")
                                assert.Equal(t, len(ask3.GetAllocationLog()), 1)
-                               assert.Equal(t, 
ask3.GetAllocationLog()[0].Message, common.PreemptionDoesNotGuarantee)
+                               assertAllocationLog(t, ask3, 
[]string{common.PreemptionDoesNotGuarantee})
                        }
                })
        }
@@ -599,8 +599,7 @@ func 
TestTryPreemption_VictimsAvailable_InsufficientResource(t *testing.T) {
        assert.Equal(t, ok, false, "no victims found")
        assert.Check(t, !alloc1.IsPreempted(), "alloc1 preempted")
        assert.Check(t, !alloc2.IsPreempted(), "alloc2 preempted")
-       log := ask3.GetAllocationLog()
-       assert.Equal(t, log[0].Message, common.PreemptionShortfall)
+       assertAllocationLog(t, ask3, []string{common.PreemptionShortfall})
 }
 
 // TestTryPreemption_VictimsOnDifferentNodes_InsufficientResource Test try 
preemption on queue with simple queue hierarchy. Since Node doesn't have enough 
resources to accomodate, preemption happens because of node resource constraint.
@@ -650,8 +649,7 @@ func 
TestTryPreemption_VictimsOnDifferentNodes_InsufficientResource(t *testing.T
        assert.Equal(t, ok, false, "no victims found")
        assert.Check(t, !alloc1.IsPreempted(), "alloc1 preempted")
        assert.Check(t, !alloc2.IsPreempted(), "alloc2 preempted")
-       log := ask3.GetAllocationLog()
-       assert.Equal(t, log[0].Message, common.PreemptionShortfall)
+       assertAllocationLog(t, ask3, []string{common.PreemptionShortfall})
 }
 
 // TestTryPreemption_VictimsAvailableOnDifferentNodes Test try preemption on 
queue with simple queue hierarchy. Since Node doesn't have enough resources to 
accomodate, preemption happens because of node resource constraint.
@@ -718,8 +716,7 @@ func TestTryPreemption_VictimsAvailableOnDifferentNodes(t 
*testing.T) {
        assert.Equal(t, ok, false, "no victims found")
        assert.Check(t, !alloc1.IsPreempted(), "alloc1 preempted")
        assert.Check(t, !alloc2.IsPreempted(), "alloc2 preempted")
-       log := ask3.GetAllocationLog()
-       assert.Equal(t, log[0].Message, common.PreemptionShortfall)
+       assertAllocationLog(t, ask3, []string{common.PreemptionShortfall})
 }
 
 // TestTryPreemption_OnQueue_VictimsOnDifferentNodes Test try preemption on 
queue with simple queue hierarchy. Since Node has enough resources to 
accomodate, preemption happens because of queue resource constraint.xw
diff --git a/pkg/scheduler/objects/preemption_utilities_test.go 
b/pkg/scheduler/objects/preemption_utilities_test.go
index 46bd5d47..013d3f13 100644
--- a/pkg/scheduler/objects/preemption_utilities_test.go
+++ b/pkg/scheduler/objects/preemption_utilities_test.go
@@ -133,6 +133,7 @@ func prepareAllocationAsks(t *testing.T, node *Node) 
[]*Allocation {
 func removeAllocationAsks(node *Node, asks []*Allocation) {
        for _, ask := range asks {
                node.RemoveAllocation(ask.GetAllocationKey())
+               ask.preempted = false
        }
 }
 
diff --git a/pkg/scheduler/objects/quota_change_preemptor.go 
b/pkg/scheduler/objects/quota_change_preemptor.go
index 40401360..6868ef89 100644
--- a/pkg/scheduler/objects/quota_change_preemptor.go
+++ b/pkg/scheduler/objects/quota_change_preemptor.go
@@ -25,6 +25,7 @@ import (
 
        "github.com/apache/yunikorn-core/pkg/common/resources"
        "github.com/apache/yunikorn-core/pkg/log"
+       "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
 )
 
 type QuotaChangePreemptionContext struct {
@@ -138,3 +139,58 @@ func (qcp *QuotaChangePreemptionContext) sortAllocations() 
{
                SortAllocations(qcp.allocations)
        }
 }
+
+func (qcp *QuotaChangePreemptionContext) preemptVictims() {
+       if len(qcp.allocations) == 0 {
+               return
+       }
+       log.Log(log.ShedQuotaChangePreemption).Info("Found victims for quota 
change preemption",
+               zap.String("queue", qcp.queue.GetQueuePath()),
+               zap.Int("total victims", len(qcp.allocations)))
+       apps := make(map[*Application][]*Allocation)
+       victimsTotalResource := resources.NewResource()
+       selectedVictimsTotalResource := resources.NewResource()
+       for _, victim := range qcp.allocations {
+               // stop collecting the victims once ask resource requirement met
+               if 
qcp.preemptableResource.StrictlyGreaterThanOnlyExisting(victimsTotalResource) {
+                       application := 
qcp.queue.GetApplication(victim.applicationID)
+                       if _, ok := apps[application]; !ok {
+                               apps[application] = []*Allocation{}
+                       }
+                       apps[application] = append(apps[application], victim)
+                       
selectedVictimsTotalResource.AddTo(victim.GetAllocatedResource())
+               }
+               victimsTotalResource.AddTo(victim.GetAllocatedResource())
+       }
+
+       if 
qcp.preemptableResource.StrictlyGreaterThanOnlyExisting(victimsTotalResource) ||
+               
selectedVictimsTotalResource.StrictlyGreaterThanOnlyExisting(qcp.preemptableResource)
 {
+               // either there is a shortfall or exceeding little above than 
required, so try "best effort" approach later
+               return
+       }
+
+       for app, victims := range apps {
+               if len(victims) > 0 {
+                       qcp.queue.MarkTriggerredQuotaChangePreemption()
+                       for _, victim := range victims {
+                               
log.Log(log.ShedQuotaChangePreemption).Info("Preempting victims for quota 
change preemption",
+                                       zap.String("queue", 
qcp.queue.GetQueuePath()),
+                                       zap.String("victim allocation key", 
victim.allocationKey),
+                                       zap.String("victim allocated 
resources", victim.GetAllocatedResource().String()),
+                                       zap.String("victim application", 
victim.applicationID),
+                                       zap.String("victim node", 
victim.GetNodeID()),
+                               )
+                               
qcp.queue.IncPreemptingResource(victim.GetAllocatedResource())
+                               victim.MarkPreempted()
+                               
victim.SendPreemptedByQuotaChangeEvent(qcp.queue.GetQueuePath())
+                       }
+                       app.notifyRMAllocationReleased(victims, 
si.TerminationType_PREEMPTED_BY_SCHEDULER,
+                               "preempting allocations to enforce new max 
quota for queue : "+qcp.queue.GetQueuePath())
+               }
+       }
+}
+
+// only for testing
+func (qcp *QuotaChangePreemptionContext) getVictims() []*Allocation {
+       return qcp.allocations
+}
diff --git a/pkg/scheduler/objects/quota_change_preemptor_test.go 
b/pkg/scheduler/objects/quota_change_preemptor_test.go
index 1159adf2..328ca5c0 100644
--- a/pkg/scheduler/objects/quota_change_preemptor_test.go
+++ b/pkg/scheduler/objects/quota_change_preemptor_test.go
@@ -20,6 +20,7 @@ package objects
 
 import (
        "testing"
+       "time"
 
        "gotest.tools/v3/assert"
 
@@ -186,3 +187,77 @@ func TestQuotaChangeFilterVictims(t *testing.T) {
                })
        }
 }
+
+func TestQuotaChangePreemptVictims(t *testing.T) {
+       leaf, err := NewConfiguredQueue(configs.QueueConfig{
+               Name: "leaf",
+       }, nil, false, nil)
+       assert.NilError(t, err)
+
+       node := NewNode(&si.NodeInfo{
+               NodeID:     "node",
+               Attributes: nil,
+               SchedulableResource: &si.Resource{
+                       Resources: map[string]*si.Quantity{"first": {Value: 
100}},
+               },
+       })
+
+       createTime := time.Now()
+       suitableVictims := make([]*Allocation, 0)
+       notSuitableVictims := make([]*Allocation, 0)
+
+       alloc1 := createAllocation("ask1", "app1", node.NodeID, true, false, 
10, false,
+               
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
+       alloc1.createTime = createTime.Add(-time.Minute * 3)
+       assert.Assert(t, node.TryAddAllocation(alloc1))
+       suitableVictims = append(suitableVictims, alloc1)
+       notSuitableVictims = append(notSuitableVictims, alloc1)
+
+       alloc2 := createAllocation("ask2", "app2", node.NodeID, true, false, 
10, false,
+               
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
+       alloc2.createTime = createTime.Add(-time.Minute * 2)
+       assert.Assert(t, node.TryAddAllocation(alloc2))
+       suitableVictims = append(suitableVictims, alloc2)
+       notSuitableVictims = append(notSuitableVictims, alloc2)
+
+       alloc3 := createAllocation("ask3", "app3", node.NodeID, true, false, 
10, false,
+               
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50}))
+       alloc3.createTime = createTime.Add(-time.Minute * 1)
+       assert.Assert(t, node.TryAddAllocation(alloc2))
+       notSuitableVictims = append(notSuitableVictims, alloc3)
+
+       testCases := []struct {
+               name                 string
+               queue                *Queue
+               preemptableResource  *resources.Resource
+               victims              []*Allocation
+               totalExpectedVictims int
+               expectedVictimsCount int
+       }{
+               {"no victims available", leaf, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), 
[]*Allocation{}, 0, 0},
+               {"suitable victims available", leaf, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), 
suitableVictims, 2, 2},
+               {"not suitable victims available", leaf, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), 
notSuitableVictims, 3, 0},
+       }
+       for _, tc := range testCases {
+               t.Run(tc.name, func(t *testing.T) {
+                       asks := tc.victims
+                       assignAllocationsToQueue(asks, leaf)
+                       preemptor := NewQuotaChangePreemptor(tc.queue)
+                       preemptor.preemptableResource = tc.preemptableResource
+                       preemptor.allocations = asks
+                       preemptor.filterAllocations()
+                       preemptor.sortAllocations()
+                       preemptor.preemptVictims()
+                       assert.Equal(t, len(preemptor.getVictims()), 
tc.totalExpectedVictims)
+                       var victimsCount int
+                       for _, a := range asks {
+                               if a.IsPreempted() {
+                                       victimsCount++
+                               }
+                       }
+                       assert.Equal(t, victimsCount, tc.expectedVictimsCount)
+                       removeAllocationAsks(node, asks)
+                       removeAllocationFromQueue(leaf)
+               })
+       }
+}
diff --git a/pkg/scheduler/objects/utilities_test.go 
b/pkg/scheduler/objects/utilities_test.go
index b7ca66e7..255f6bc5 100644
--- a/pkg/scheduler/objects/utilities_test.go
+++ b/pkg/scheduler/objects/utilities_test.go
@@ -26,7 +26,6 @@ import (
        "github.com/google/btree"
        "gotest.tools/v3/assert"
 
-       "github.com/apache/yunikorn-core/pkg/common"
        "github.com/apache/yunikorn-core/pkg/common/configs"
        "github.com/apache/yunikorn-core/pkg/common/resources"
        "github.com/apache/yunikorn-core/pkg/common/security"
@@ -318,20 +317,17 @@ func assertUserResourcesAndGroupResources(t *testing.T, 
userGroup security.UserG
        }
 }
 
-func assertAllocationLog(t *testing.T, ask *Allocation) {
+func assertAllocationLog(t *testing.T, ask *Allocation, message []string) {
        log := ask.GetAllocationLog()
-       preemptionPreconditionsFailed := false
-       PreemptionDoesNotHelp := false
+       logged := false
        for _, l := range log {
-               switch l.Message {
-               case common.PreemptionPreconditionsFailed:
-                       preemptionPreconditionsFailed = true
-               case common.PreemptionDoesNotHelp:
-                       PreemptionDoesNotHelp = true
+               for _, m := range message {
+                       if l.Message == m {
+                               logged = true
+                       }
                }
        }
-       assert.Assert(t, preemptionPreconditionsFailed)
-       assert.Assert(t, PreemptionDoesNotHelp)
+       assert.Assert(t, logged)
 }
 
 func getNodeIteratorFn(nodes ...*Node) func() NodeIterator {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to