This is an automated email from the ASF dual-hosted git repository.
mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new c5a6a970 [YUNIKORN-3146] Preempt victims (#1044)
c5a6a970 is described below
commit c5a6a970cfb40b6eee90db58c83517ca13a36266
Author: mani <[email protected]>
AuthorDate: Tue Nov 11 20:34:34 2025 +0530
[YUNIKORN-3146] Preempt victims (#1044)
Closes: #1044
Signed-off-by: mani <[email protected]>
---
pkg/scheduler/objects/allocation.go | 5 ++
pkg/scheduler/objects/application_test.go | 7 +-
pkg/scheduler/objects/events/ask_events.go | 9 +++
pkg/scheduler/objects/events/ask_events_test.go | 20 ++++++
pkg/scheduler/objects/preemption_test.go | 13 ++--
pkg/scheduler/objects/preemption_utilities_test.go | 1 +
pkg/scheduler/objects/quota_change_preemptor.go | 56 ++++++++++++++++
.../objects/quota_change_preemptor_test.go | 75 ++++++++++++++++++++++
pkg/scheduler/objects/utilities_test.go | 18 ++----
9 files changed, 181 insertions(+), 23 deletions(-)
diff --git a/pkg/scheduler/objects/allocation.go
b/pkg/scheduler/objects/allocation.go
index 2bcd7c2b..3b60c80b 100644
--- a/pkg/scheduler/objects/allocation.go
+++ b/pkg/scheduler/objects/allocation.go
@@ -481,6 +481,11 @@ func (a *Allocation)
SendPreemptedBySchedulerEvent(preemptorAllocKey, preemptorA
a.askEvents.SendPreemptedByScheduler(a.allocationKey, a.applicationID,
preemptorAllocKey, preemptorAppId, preemptorQueuePath, a.GetAllocatedResource())
}
+// SendPreemptedByQuotaChangeEvent updates the event system with the Quota
change preemption event.
+func (a *Allocation) SendPreemptedByQuotaChangeEvent(queuePath string) {
+ a.askEvents.SendPreemptedByQuotaChange(a.allocationKey,
a.applicationID, queuePath, a.GetAllocatedResource())
+}
+
// GetAllocationLog returns a list of log entries corresponding to allocation
preconditions not being met.
func (a *Allocation) GetAllocationLog() []*AllocationLogEntry {
a.RLock()
diff --git a/pkg/scheduler/objects/application_test.go
b/pkg/scheduler/objects/application_test.go
index e25597ad..53688323 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -2242,8 +2242,7 @@ func TestTryAllocatePreemptQueue(t *testing.T) {
result3 :=
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
0}), true, 30*time.Second, &maxAttemptsExhausted, iterator, iterator, getNode)
assert.Assert(t, result3 == nil, "result3 not expected")
assert.Assert(t, !alloc2.IsPreempted(), "alloc2 should not have been
preempted")
- log := ask3.GetAllocationLog()
- assert.Equal(t, log[0].Message, common.PreemptionMaxAttemptsExhausted)
+ assertAllocationLog(t, ask3,
[]string{common.PreemptionMaxAttemptsExhausted, common.PreemptionDoesNotHelp})
assert.Equal(t, maxAttemptsExhausted, 0)
maxAttemptsDecrease := 10
@@ -2252,7 +2251,7 @@ func TestTryAllocatePreemptQueue(t *testing.T) {
result3 =
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
0}), true, 30*time.Second, &maxAttemptsDecrease, iterator, iterator, getNode)
assert.Assert(t, result3 == nil, "result3 not expected")
assert.Assert(t, !alloc2.IsPreempted(), "alloc2 should not have been
preempted")
- assertAllocationLog(t, ask3)
+ assertAllocationLog(t, ask3,
[]string{common.PreemptionPreconditionsFailed, common.PreemptionDoesNotHelp})
assert.Equal(t, maxAttemptsDecrease, 10)
// pass the time and try again
@@ -2271,7 +2270,7 @@ func TestTryAllocatePreemptNode(t *testing.T) {
result3 :=
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator,
getNode)
assert.Assert(t, result3 == nil, "result3 expected")
assert.Assert(t, !allocs[1].IsPreempted(), "alloc1 should have been
preempted")
- assertAllocationLog(t, ask3)
+ assertAllocationLog(t, ask3,
[]string{common.PreemptionPreconditionsFailed, common.PreemptionDoesNotHelp})
// pass the time and try again
ask3.createTime = ask3.createTime.Add(-30 * time.Second)
diff --git a/pkg/scheduler/objects/events/ask_events.go
b/pkg/scheduler/objects/events/ask_events.go
index db657339..24542094 100644
--- a/pkg/scheduler/objects/events/ask_events.go
+++ b/pkg/scheduler/objects/events/ask_events.go
@@ -114,6 +114,15 @@ func (ae *AskEvents) SendPreemptedByScheduler(allocKey,
appID, preemptorAllocKey
ae.eventSystem.AddEvent(event)
}
+func (ae *AskEvents) SendPreemptedByQuotaChange(allocKey, appID, queuePath
string, allocatedResource *resources.Resource) {
+ if !ae.eventSystem.IsEventTrackingEnabled() {
+ return
+ }
+ message := fmt.Sprintf("Preempted by Quota change enforcement process
in %s", queuePath)
+ event := events.CreateRequestEventRecord(allocKey, appID, message,
allocatedResource)
+ ae.eventSystem.AddEvent(event)
+}
+
func NewAskEvents(evt events.EventSystem) *AskEvents {
return newAskEventsWithRate(evt, 15*time.Second, 1)
}
diff --git a/pkg/scheduler/objects/events/ask_events_test.go
b/pkg/scheduler/objects/events/ask_events_test.go
index d6680600..e0a755a9 100644
--- a/pkg/scheduler/objects/events/ask_events_test.go
+++ b/pkg/scheduler/objects/events/ask_events_test.go
@@ -198,3 +198,23 @@ func TestPreemptedBySchedulerEvents(t *testing.T) {
assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
assert.Equal(t, "Preempted by preemptor-0 from application
preemptor-app-0 in root.parent.child1", event.Message)
}
+
+func TestSendPreemptedByQuotaChange(t *testing.T) {
+ resource :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
+ eventSystem := mock.NewEventSystemDisabled()
+ events := NewAskEvents(eventSystem)
+ events.SendPreemptedByQuotaChange("alloc-0", appID,
"root.parent.child1", resource)
+ assert.Equal(t, 0, len(eventSystem.Events))
+
+ eventSystem = mock.NewEventSystem()
+ events = NewAskEvents(eventSystem)
+ events.SendPreemptedByQuotaChange("alloc-0", appID,
"root.parent.child1", resource)
+ assert.Equal(t, 1, len(eventSystem.Events))
+ event := eventSystem.Events[0]
+ assert.Equal(t, "alloc-0", event.ObjectID)
+ assert.Equal(t, appID, event.ReferenceID)
+ assert.Equal(t, si.EventRecord_REQUEST, event.Type)
+ assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
+ assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
+ assert.Equal(t, "Preempted by Quota change enforcement process in
root.parent.child1", event.Message)
+}
diff --git a/pkg/scheduler/objects/preemption_test.go
b/pkg/scheduler/objects/preemption_test.go
index 4cbf939a..c09884cb 100644
--- a/pkg/scheduler/objects/preemption_test.go
+++ b/pkg/scheduler/objects/preemption_test.go
@@ -168,7 +168,7 @@ func TestCheckPreconditions(t *testing.T) {
preemptionAttemptsRemaining := 1
result :=
app.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
2}), true, 1*time.Second, &preemptionAttemptsRemaining, iterator, iterator,
getNode)
assert.Check(t, result == nil, "unexpected result")
- assertAllocationLog(t, ask)
+ assertAllocationLog(t, ask,
[]string{common.PreemptionPreconditionsFailed, common.PreemptionDoesNotHelp})
ask.preemptCheckTime = time.Now().Add(-1 * time.Minute)
assert.Assert(t, preemptor.CheckPreconditions(), "preconditions failed")
assert.Assert(t, !preemptor.CheckPreconditions(), "preconditions
succeeded on successive run")
@@ -255,7 +255,7 @@ func
TestCheckPreemptionQueueGuaranteesWithNoGuaranteedResources(t *testing.T) {
} else {
assert.Equal(t, result == nil, true,
"unexpected resultType")
assert.Equal(t, len(ask3.GetAllocationLog()), 1)
- assert.Equal(t,
ask3.GetAllocationLog()[0].Message, common.PreemptionDoesNotGuarantee)
+ assertAllocationLog(t, ask3,
[]string{common.PreemptionDoesNotGuarantee})
}
})
}
@@ -599,8 +599,7 @@ func
TestTryPreemption_VictimsAvailable_InsufficientResource(t *testing.T) {
assert.Equal(t, ok, false, "no victims found")
assert.Check(t, !alloc1.IsPreempted(), "alloc1 preempted")
assert.Check(t, !alloc2.IsPreempted(), "alloc2 preempted")
- log := ask3.GetAllocationLog()
- assert.Equal(t, log[0].Message, common.PreemptionShortfall)
+ assertAllocationLog(t, ask3, []string{common.PreemptionShortfall})
}
// TestTryPreemption_VictimsOnDifferentNodes_InsufficientResource Test try
preemption on queue with simple queue hierarchy. Since Node doesn't have enough
resources to accomodate, preemption happens because of node resource constraint.
@@ -650,8 +649,7 @@ func
TestTryPreemption_VictimsOnDifferentNodes_InsufficientResource(t *testing.T
assert.Equal(t, ok, false, "no victims found")
assert.Check(t, !alloc1.IsPreempted(), "alloc1 preempted")
assert.Check(t, !alloc2.IsPreempted(), "alloc2 preempted")
- log := ask3.GetAllocationLog()
- assert.Equal(t, log[0].Message, common.PreemptionShortfall)
+ assertAllocationLog(t, ask3, []string{common.PreemptionShortfall})
}
// TestTryPreemption_VictimsAvailableOnDifferentNodes Test try preemption on
queue with simple queue hierarchy. Since Node doesn't have enough resources to
accomodate, preemption happens because of node resource constraint.
@@ -718,8 +716,7 @@ func TestTryPreemption_VictimsAvailableOnDifferentNodes(t
*testing.T) {
assert.Equal(t, ok, false, "no victims found")
assert.Check(t, !alloc1.IsPreempted(), "alloc1 preempted")
assert.Check(t, !alloc2.IsPreempted(), "alloc2 preempted")
- log := ask3.GetAllocationLog()
- assert.Equal(t, log[0].Message, common.PreemptionShortfall)
+ assertAllocationLog(t, ask3, []string{common.PreemptionShortfall})
}
// TestTryPreemption_OnQueue_VictimsOnDifferentNodes Test try preemption on
queue with simple queue hierarchy. Since Node has enough resources to
accomodate, preemption happens because of queue resource constraint.xw
diff --git a/pkg/scheduler/objects/preemption_utilities_test.go
b/pkg/scheduler/objects/preemption_utilities_test.go
index 46bd5d47..013d3f13 100644
--- a/pkg/scheduler/objects/preemption_utilities_test.go
+++ b/pkg/scheduler/objects/preemption_utilities_test.go
@@ -133,6 +133,7 @@ func prepareAllocationAsks(t *testing.T, node *Node)
[]*Allocation {
func removeAllocationAsks(node *Node, asks []*Allocation) {
for _, ask := range asks {
node.RemoveAllocation(ask.GetAllocationKey())
+ ask.preempted = false
}
}
diff --git a/pkg/scheduler/objects/quota_change_preemptor.go
b/pkg/scheduler/objects/quota_change_preemptor.go
index 40401360..6868ef89 100644
--- a/pkg/scheduler/objects/quota_change_preemptor.go
+++ b/pkg/scheduler/objects/quota_change_preemptor.go
@@ -25,6 +25,7 @@ import (
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/log"
+ "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
type QuotaChangePreemptionContext struct {
@@ -138,3 +139,58 @@ func (qcp *QuotaChangePreemptionContext) sortAllocations()
{
SortAllocations(qcp.allocations)
}
}
+
+func (qcp *QuotaChangePreemptionContext) preemptVictims() {
+ if len(qcp.allocations) == 0 {
+ return
+ }
+ log.Log(log.ShedQuotaChangePreemption).Info("Found victims for quota
change preemption",
+ zap.String("queue", qcp.queue.GetQueuePath()),
+ zap.Int("total victims", len(qcp.allocations)))
+ apps := make(map[*Application][]*Allocation)
+ victimsTotalResource := resources.NewResource()
+ selectedVictimsTotalResource := resources.NewResource()
+ for _, victim := range qcp.allocations {
+ // stop collecting the victims once ask resource requirement met
+ if
qcp.preemptableResource.StrictlyGreaterThanOnlyExisting(victimsTotalResource) {
+ application :=
qcp.queue.GetApplication(victim.applicationID)
+ if _, ok := apps[application]; !ok {
+ apps[application] = []*Allocation{}
+ }
+ apps[application] = append(apps[application], victim)
+
selectedVictimsTotalResource.AddTo(victim.GetAllocatedResource())
+ }
+ victimsTotalResource.AddTo(victim.GetAllocatedResource())
+ }
+
+ if
qcp.preemptableResource.StrictlyGreaterThanOnlyExisting(victimsTotalResource) ||
+
selectedVictimsTotalResource.StrictlyGreaterThanOnlyExisting(qcp.preemptableResource)
{
+ // either there is a shortfall or exceeding little above than
required, so try "best effort" approach later
+ return
+ }
+
+ for app, victims := range apps {
+ if len(victims) > 0 {
+ qcp.queue.MarkTriggerredQuotaChangePreemption()
+ for _, victim := range victims {
+
log.Log(log.ShedQuotaChangePreemption).Info("Preempting victims for quota
change preemption",
+ zap.String("queue",
qcp.queue.GetQueuePath()),
+ zap.String("victim allocation key",
victim.allocationKey),
+ zap.String("victim allocated
resources", victim.GetAllocatedResource().String()),
+ zap.String("victim application",
victim.applicationID),
+ zap.String("victim node",
victim.GetNodeID()),
+ )
+
qcp.queue.IncPreemptingResource(victim.GetAllocatedResource())
+ victim.MarkPreempted()
+
victim.SendPreemptedByQuotaChangeEvent(qcp.queue.GetQueuePath())
+ }
+ app.notifyRMAllocationReleased(victims,
si.TerminationType_PREEMPTED_BY_SCHEDULER,
+ "preempting allocations to enforce new max
quota for queue : "+qcp.queue.GetQueuePath())
+ }
+ }
+}
+
+// only for testing
+func (qcp *QuotaChangePreemptionContext) getVictims() []*Allocation {
+ return qcp.allocations
+}
diff --git a/pkg/scheduler/objects/quota_change_preemptor_test.go
b/pkg/scheduler/objects/quota_change_preemptor_test.go
index 1159adf2..328ca5c0 100644
--- a/pkg/scheduler/objects/quota_change_preemptor_test.go
+++ b/pkg/scheduler/objects/quota_change_preemptor_test.go
@@ -20,6 +20,7 @@ package objects
import (
"testing"
+ "time"
"gotest.tools/v3/assert"
@@ -186,3 +187,77 @@ func TestQuotaChangeFilterVictims(t *testing.T) {
})
}
}
+
+func TestQuotaChangePreemptVictims(t *testing.T) {
+ leaf, err := NewConfiguredQueue(configs.QueueConfig{
+ Name: "leaf",
+ }, nil, false, nil)
+ assert.NilError(t, err)
+
+ node := NewNode(&si.NodeInfo{
+ NodeID: "node",
+ Attributes: nil,
+ SchedulableResource: &si.Resource{
+ Resources: map[string]*si.Quantity{"first": {Value:
100}},
+ },
+ })
+
+ createTime := time.Now()
+ suitableVictims := make([]*Allocation, 0)
+ notSuitableVictims := make([]*Allocation, 0)
+
+ alloc1 := createAllocation("ask1", "app1", node.NodeID, true, false,
10, false,
+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
+ alloc1.createTime = createTime.Add(-time.Minute * 3)
+ assert.Assert(t, node.TryAddAllocation(alloc1))
+ suitableVictims = append(suitableVictims, alloc1)
+ notSuitableVictims = append(notSuitableVictims, alloc1)
+
+ alloc2 := createAllocation("ask2", "app2", node.NodeID, true, false,
10, false,
+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
+ alloc2.createTime = createTime.Add(-time.Minute * 2)
+ assert.Assert(t, node.TryAddAllocation(alloc2))
+ suitableVictims = append(suitableVictims, alloc2)
+ notSuitableVictims = append(notSuitableVictims, alloc2)
+
+ alloc3 := createAllocation("ask3", "app3", node.NodeID, true, false,
10, false,
+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50}))
+ alloc3.createTime = createTime.Add(-time.Minute * 1)
+ assert.Assert(t, node.TryAddAllocation(alloc2))
+ notSuitableVictims = append(notSuitableVictims, alloc3)
+
+ testCases := []struct {
+ name string
+ queue *Queue
+ preemptableResource *resources.Resource
+ victims []*Allocation
+ totalExpectedVictims int
+ expectedVictimsCount int
+ }{
+ {"no victims available", leaf,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}),
[]*Allocation{}, 0, 0},
+ {"suitable victims available", leaf,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}),
suitableVictims, 2, 2},
+ {"not suitable victims available", leaf,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}),
notSuitableVictims, 3, 0},
+ }
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ asks := tc.victims
+ assignAllocationsToQueue(asks, leaf)
+ preemptor := NewQuotaChangePreemptor(tc.queue)
+ preemptor.preemptableResource = tc.preemptableResource
+ preemptor.allocations = asks
+ preemptor.filterAllocations()
+ preemptor.sortAllocations()
+ preemptor.preemptVictims()
+ assert.Equal(t, len(preemptor.getVictims()),
tc.totalExpectedVictims)
+ var victimsCount int
+ for _, a := range asks {
+ if a.IsPreempted() {
+ victimsCount++
+ }
+ }
+ assert.Equal(t, victimsCount, tc.expectedVictimsCount)
+ removeAllocationAsks(node, asks)
+ removeAllocationFromQueue(leaf)
+ })
+ }
+}
diff --git a/pkg/scheduler/objects/utilities_test.go
b/pkg/scheduler/objects/utilities_test.go
index b7ca66e7..255f6bc5 100644
--- a/pkg/scheduler/objects/utilities_test.go
+++ b/pkg/scheduler/objects/utilities_test.go
@@ -26,7 +26,6 @@ import (
"github.com/google/btree"
"gotest.tools/v3/assert"
- "github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/common/security"
@@ -318,20 +317,17 @@ func assertUserResourcesAndGroupResources(t *testing.T,
userGroup security.UserG
}
}
-func assertAllocationLog(t *testing.T, ask *Allocation) {
+func assertAllocationLog(t *testing.T, ask *Allocation, message []string) {
log := ask.GetAllocationLog()
- preemptionPreconditionsFailed := false
- PreemptionDoesNotHelp := false
+ logged := false
for _, l := range log {
- switch l.Message {
- case common.PreemptionPreconditionsFailed:
- preemptionPreconditionsFailed = true
- case common.PreemptionDoesNotHelp:
- PreemptionDoesNotHelp = true
+ for _, m := range message {
+ if l.Message == m {
+ logged = true
+ }
}
}
- assert.Assert(t, preemptionPreconditionsFailed)
- assert.Assert(t, PreemptionDoesNotHelp)
+ assert.Assert(t, logged)
}
func getNodeIteratorFn(nodes ...*Node) func() NodeIterator {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]