This is an automated email from the ASF dual-hosted git repository.
mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new caec1094 [YUNIKORN-3143] Victim ordering and selection (#1040)
caec1094 is described below
commit caec10943d783ee438686a4df6560e8168f235b8
Author: mani <[email protected]>
AuthorDate: Mon Nov 10 12:24:33 2025 +0530
[YUNIKORN-3143] Victim ordering and selection (#1040)
Closes: #1040
Signed-off-by: mani <[email protected]>
---
pkg/log/logger.go | 61 +++---
pkg/log/logger_test.go | 2 +-
pkg/scheduler/objects/preemption_utilities.go | 81 ++++++++
pkg/scheduler/objects/preemption_utilities_test.go | 206 +++++++++++++++++++++
pkg/scheduler/objects/quota_change_preemptor.go | 71 ++++++-
.../objects/quota_change_preemptor_test.go | 57 +++++-
pkg/scheduler/objects/required_node_preemptor.go | 66 +------
.../objects/required_node_preemptor_test.go | 164 ----------------
8 files changed, 446 insertions(+), 262 deletions(-)
diff --git a/pkg/log/logger.go b/pkg/log/logger.go
index b902556d..6ef29fe9 100644
--- a/pkg/log/logger.go
+++ b/pkg/log/logger.go
@@ -54,42 +54,43 @@ const (
// Defined loggers: when adding new loggers, ids must be sequential, and all
must be added to the loggers slice in the same order
var (
- Core = &LoggerHandle{id: 0, name: "core"}
- Test = &LoggerHandle{id: 1, name: "test"}
- Deprecation = &LoggerHandle{id: 2, name: "deprecation"}
- Config = &LoggerHandle{id: 3, name: "core.config"}
- Entrypoint = &LoggerHandle{id: 4, name: "core.entrypoint"}
- Events = &LoggerHandle{id: 5, name: "core.events"}
- OpenTracing = &LoggerHandle{id: 6, name: "core.opentracing"}
- Resources = &LoggerHandle{id: 7, name: "core.resources"}
- REST = &LoggerHandle{id: 8, name: "core.rest"}
- RMProxy = &LoggerHandle{id: 9, name: "core.rmproxy"}
- RPC = &LoggerHandle{id: 10, name: "core.rpc"}
- Metrics = &LoggerHandle{id: 11, name: "core.metrics"}
- Scheduler = &LoggerHandle{id: 12, name: "core.scheduler"}
- SchedAllocation = &LoggerHandle{id: 13, name:
"core.scheduler.allocation"}
- SchedApplication = &LoggerHandle{id: 14, name:
"core.scheduler.application"}
- SchedAppUsage = &LoggerHandle{id: 15, name:
"core.scheduler.application.usage"}
- SchedContext = &LoggerHandle{id: 16, name: "core.scheduler.context"}
- SchedFSM = &LoggerHandle{id: 17, name: "core.scheduler.fsm"}
- SchedHealth = &LoggerHandle{id: 18, name: "core.scheduler.health"}
- SchedNode = &LoggerHandle{id: 19, name: "core.scheduler.node"}
- SchedPartition = &LoggerHandle{id: 20, name:
"core.scheduler.partition"}
- SchedPreemption = &LoggerHandle{id: 21, name:
"core.scheduler.preemption"}
- SchedQueue = &LoggerHandle{id: 22, name: "core.scheduler.queue"}
- SchedReservation = &LoggerHandle{id: 23, name:
"core.scheduler.reservation"}
- SchedUGM = &LoggerHandle{id: 24, name: "core.scheduler.ugm"}
- SchedNodesUsage = &LoggerHandle{id: 25, name:
"core.scheduler.nodesusage"}
- Security = &LoggerHandle{id: 26, name: "core.security"}
- Utils = &LoggerHandle{id: 27, name: "core.utils"}
- Diagnostics = &LoggerHandle{id: 28, name: "core.diagnostics"}
+ Core = &LoggerHandle{id: 0, name: "core"}
+ Test = &LoggerHandle{id: 1, name: "test"}
+ Deprecation = &LoggerHandle{id: 2, name: "deprecation"}
+ Config = &LoggerHandle{id: 3, name: "core.config"}
+ Entrypoint = &LoggerHandle{id: 4, name:
"core.entrypoint"}
+ Events = &LoggerHandle{id: 5, name: "core.events"}
+ OpenTracing = &LoggerHandle{id: 6, name:
"core.opentracing"}
+ Resources = &LoggerHandle{id: 7, name: "core.resources"}
+ REST = &LoggerHandle{id: 8, name: "core.rest"}
+ RMProxy = &LoggerHandle{id: 9, name: "core.rmproxy"}
+ RPC = &LoggerHandle{id: 10, name: "core.rpc"}
+ Metrics = &LoggerHandle{id: 11, name: "core.metrics"}
+ Scheduler = &LoggerHandle{id: 12, name:
"core.scheduler"}
+ SchedAllocation = &LoggerHandle{id: 13, name:
"core.scheduler.allocation"}
+ SchedApplication = &LoggerHandle{id: 14, name:
"core.scheduler.application"}
+ SchedAppUsage = &LoggerHandle{id: 15, name:
"core.scheduler.application.usage"}
+ SchedContext = &LoggerHandle{id: 16, name:
"core.scheduler.context"}
+ SchedFSM = &LoggerHandle{id: 17, name:
"core.scheduler.fsm"}
+ SchedHealth = &LoggerHandle{id: 18, name:
"core.scheduler.health"}
+ SchedNode = &LoggerHandle{id: 19, name:
"core.scheduler.node"}
+ SchedPartition = &LoggerHandle{id: 20, name:
"core.scheduler.partition"}
+ SchedPreemption = &LoggerHandle{id: 21, name:
"core.scheduler.preemption"}
+ SchedQueue = &LoggerHandle{id: 22, name:
"core.scheduler.queue"}
+ SchedReservation = &LoggerHandle{id: 23, name:
"core.scheduler.reservation"}
+ SchedUGM = &LoggerHandle{id: 24, name:
"core.scheduler.ugm"}
+ SchedNodesUsage = &LoggerHandle{id: 25, name:
"core.scheduler.nodesusage"}
+ Security = &LoggerHandle{id: 26, name: "core.security"}
+ Utils = &LoggerHandle{id: 27, name: "core.utils"}
+ Diagnostics = &LoggerHandle{id: 28, name:
"core.diagnostics"}
+ ShedQuotaChangePreemption = &LoggerHandle{id: 29, name:
"core.scheduler.preemption.quotachange"}
)
// this tracks all the known logger handles, used to preallocate the real
logger instances when configuration changes
var loggers = []*LoggerHandle{
Core, Test, Deprecation, Config, Entrypoint, Events, OpenTracing,
Resources, REST, RMProxy, RPC, Metrics,
Scheduler, SchedAllocation, SchedApplication, SchedAppUsage,
SchedContext, SchedFSM, SchedHealth, SchedNode,
- SchedPartition, SchedPreemption, SchedQueue, SchedReservation,
SchedUGM, SchedNodesUsage, Security, Utils, Diagnostics,
+ SchedPartition, SchedPreemption, SchedQueue, SchedReservation,
SchedUGM, SchedNodesUsage, Security, Utils, Diagnostics,
ShedQuotaChangePreemption,
}
// structure to hold all current logger configuration state
diff --git a/pkg/log/logger_test.go b/pkg/log/logger_test.go
index 82e31f92..344ae892 100644
--- a/pkg/log/logger_test.go
+++ b/pkg/log/logger_test.go
@@ -39,7 +39,7 @@ func TestLoggerIDs(t *testing.T) {
_ = Log(Test)
// validate logger count
- assert.Equal(t, 29, len(loggers), "wrong logger count")
+ assert.Equal(t, 30, len(loggers), "wrong logger count")
// validate that all loggers are populated and have sequential ids
for i := 0; i < len(loggers); i++ {
diff --git a/pkg/scheduler/objects/preemption_utilities.go
b/pkg/scheduler/objects/preemption_utilities.go
new file mode 100644
index 00000000..c522ed68
--- /dev/null
+++ b/pkg/scheduler/objects/preemption_utilities.go
@@ -0,0 +1,81 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+ "sort"
+
+ "github.com/apache/yunikorn-core/pkg/common/resources"
+)
+
+// SortAllocations Sort allocations based on the following criteria in the
specified order:
+// 1. By type (regular pods, opted out pods, driver/owner pods),
+// 2. By priority (least priority ask placed first),
+// 3. By Create time or age of the ask (younger ask placed first),
+// 4. By resource (ask with lesser allocated resources placed first)
+func SortAllocations(allocations []*Allocation) {
+ sort.SliceStable(allocations, func(i, j int) bool {
+ l := allocations[i]
+ r := allocations[j]
+
+ // sort based on the type
+ lAskType := 1 // regular pod
+ if l.IsOriginator() { // driver/owner pod
+ lAskType = 3
+ } else if !l.IsAllowPreemptSelf() { // opted out pod
+ lAskType = 2
+ }
+ rAskType := 1
+ if r.IsOriginator() {
+ rAskType = 3
+ } else if !r.IsAllowPreemptSelf() {
+ rAskType = 2
+ }
+ if lAskType < rAskType {
+ return true
+ }
+ if lAskType > rAskType {
+ return false
+ }
+
+ // sort based on the priority
+ lPriority := l.GetPriority()
+ rPriority := r.GetPriority()
+ if lPriority < rPriority {
+ return true
+ }
+ if lPriority > rPriority {
+ return false
+ }
+
+ // sort based on the age
+ if !l.GetCreateTime().Equal(r.GetCreateTime()) {
+ return l.GetCreateTime().After(r.GetCreateTime())
+ }
+
+ // sort based on the allocated resource
+ lResource := l.GetAllocatedResource()
+ rResource := r.GetAllocatedResource()
+ if !resources.Equals(lResource, rResource) {
+ delta := resources.Sub(lResource, rResource)
+ return !resources.StrictlyGreaterThanZero(delta)
+ }
+ return true
+ })
+}
diff --git a/pkg/scheduler/objects/preemption_utilities_test.go
b/pkg/scheduler/objects/preemption_utilities_test.go
new file mode 100644
index 00000000..46bd5d47
--- /dev/null
+++ b/pkg/scheduler/objects/preemption_utilities_test.go
@@ -0,0 +1,206 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+ "testing"
+ "time"
+
+ "gotest.tools/v3/assert"
+
+ "github.com/apache/yunikorn-core/pkg/common/resources"
+ siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
+ "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+func createAllocationAsk(allocationKey string, app string, allowPreemption
bool, isOriginator bool, priority int32, res *resources.Resource) *Allocation {
+ tags := map[string]string{}
+ siAsk := &si.Allocation{
+ AllocationKey: allocationKey,
+ ApplicationID: app,
+ PartitionName: "default",
+ Priority: priority,
+ ResourcePerAlloc: res.ToProto(),
+ Originator: isOriginator,
+ PreemptionPolicy: &si.PreemptionPolicy{AllowPreemptSelf:
allowPreemption, AllowPreemptOther: true},
+ AllocationTags: tags,
+ }
+ ask := NewAllocationFromSI(siAsk)
+ return ask
+}
+
+func createAllocation(allocationKey string, app string, nodeID string,
allowPreemption bool, isOriginator bool, priority int32, requiredNode bool, res
*resources.Resource) *Allocation {
+ tags := map[string]string{}
+ siAsk := &si.Allocation{
+ AllocationKey: allocationKey,
+ ApplicationID: app,
+ PartitionName: "default",
+ Priority: priority,
+ ResourcePerAlloc: res.ToProto(),
+ Originator: isOriginator,
+ PreemptionPolicy: &si.PreemptionPolicy{AllowPreemptSelf:
allowPreemption, AllowPreemptOther: true},
+ AllocationTags: tags,
+ NodeID: nodeID,
+ }
+ if requiredNode {
+ tags[siCommon.DomainYuniKorn+siCommon.KeyRequiredNode] = nodeID
+ }
+ ask := NewAllocationFromSI(siAsk)
+ return ask
+}
+
+func prepareAllocationAsks(t *testing.T, node *Node) []*Allocation {
+ createTime := time.Now()
+
+ result := make([]*Allocation, 0)
+
+ // regular pods
+ alloc1 := createAllocation("ask1", "app1", node.NodeID, true, false,
10, false,
+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
+ assert.Assert(t, node.TryAddAllocation(alloc1))
+ result = append(result, alloc1)
+
+ alloc2 := createAllocation("ask2", "app1", node.NodeID, true, false,
10, false,
+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 8}))
+ alloc2.createTime = createTime
+ assert.Assert(t, node.TryAddAllocation(alloc2))
+ result = append(result, alloc2)
+
+ alloc3 := createAllocation("ask3", "app1", node.NodeID, true, false,
15, false,
+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
+ assert.Assert(t, node.TryAddAllocation(alloc3))
+ result = append(result, alloc3)
+
+ alloc4 := createAllocation("ask4", "app1", node.NodeID, true, false,
10, false,
+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
+ alloc4.createTime = createTime
+ assert.Assert(t, node.TryAddAllocation(alloc4))
+ result = append(result, alloc4)
+
+ alloc5 := createAllocation("ask5", "app1", node.NodeID, true, false, 5,
false,
+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
+ assert.Assert(t, node.TryAddAllocation(alloc5))
+ result = append(result, alloc5)
+
+ // opted out pods
+ alloc6 := createAllocation("ask6", "app1", node.NodeID, false, false,
10, false,
+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
+ assert.Assert(t, node.TryAddAllocation(alloc6))
+ result = append(result, alloc6)
+
+ alloc7 := createAllocation("ask7", "app1", node.NodeID, false, false,
10, false,
+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 8}))
+ alloc7.createTime = createTime
+ assert.Assert(t, node.TryAddAllocation(alloc7))
+ result = append(result, alloc7)
+
+ alloc8 := createAllocation("ask8", "app1", node.NodeID, false, false,
15, false,
+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
+ assert.Assert(t, node.TryAddAllocation(alloc8))
+ result = append(result, alloc8)
+
+ // driver/owner pods
+ alloc9 := createAllocation("ask9", "app1", node.NodeID, false, true,
10, false,
+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
+ alloc9.createTime = createTime
+ assert.Assert(t, node.TryAddAllocation(alloc9))
+ result = append(result, alloc9)
+
+ alloc10 := createAllocation("ask10", "app1", node.NodeID, true, true,
5, false,
+
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
+ assert.Assert(t, node.TryAddAllocation(alloc10))
+ result = append(result, alloc10)
+
+ return result
+}
+
+func removeAllocationAsks(node *Node, asks []*Allocation) {
+ for _, ask := range asks {
+ node.RemoveAllocation(ask.GetAllocationKey())
+ }
+}
+
+func assignAllocationsToQueue(allocations []*Allocation, queue *Queue) {
+ for _, allocation := range allocations {
+ var app *Application
+ var ok bool
+ if _, ok = queue.applications[allocation.applicationID]; !ok {
+ app = newApplication(allocation.applicationID,
"default", queue.QueuePath)
+ app.SetQueue(queue)
+ queue.applications[allocation.applicationID] = app
+ } else {
+ app = queue.applications[allocation.applicationID]
+ }
+ app.AddAllocation(allocation)
+ }
+}
+
+func removeAllocationFromQueue(queue *Queue) {
+ queue.applications = make(map[string]*Application)
+}
+
+// regular pods
+// ask1: pri - 10, create time - 1, res - 10
+// ask2: pri - 10, create time - 2, res - 8
+// ask3: pri - 15, create time - 3, res - 10
+// ask4: pri - 10, create time - 2, res - 5
+// ask5: pri - 5, create time - 4, res - 5
+
+// opted out pods
+// ask6: pri - 10, create time - 5, res - 10
+// ask7: pri - 10, create time - 2, res - 8
+// ask8: pri - 15, create time - 6, res - 10
+
+// driver/owner pods
+// ask9: pri - 10, create time - 2, res - 5
+// ask10: pri - 5, create time - 4, res - 5
+
+// original asks order: 6, 7, 8, 9, 10, 1, 2, 3, 4, 5
+// expected sorted asks o/p: 5, 1, 4, 2, 3, 6, 7, 8, 10, 9
+func TestSortAllocations(t *testing.T) {
+ node := NewNode(&si.NodeInfo{
+ NodeID: "node",
+ Attributes: nil,
+ SchedulableResource: &si.Resource{
+ Resources: map[string]*si.Quantity{"first": {Value:
100}},
+ },
+ })
+
+ asks := prepareAllocationAsks(t, node)
+ SortAllocations(asks)
+ sortedAsks := asks
+
+ // assert regular pods
+ assert.Equal(t, sortedAsks[0].GetAllocationKey(), "ask5")
+ assert.Equal(t, sortedAsks[1].GetAllocationKey(), "ask1")
+ assert.Equal(t, sortedAsks[2].GetAllocationKey(), "ask4")
+ assert.Equal(t, sortedAsks[3].GetAllocationKey(), "ask2")
+ assert.Equal(t, sortedAsks[4].GetAllocationKey(), "ask3")
+
+ // assert opted out pods
+ assert.Equal(t, sortedAsks[5].GetAllocationKey(), "ask6")
+ assert.Equal(t, sortedAsks[6].GetAllocationKey(), "ask7")
+ assert.Equal(t, sortedAsks[7].GetAllocationKey(), "ask8")
+
+ // assert driver/owner pods
+ assert.Equal(t, sortedAsks[8].GetAllocationKey(), "ask10")
+ assert.Equal(t, sortedAsks[9].GetAllocationKey(), "ask9")
+
+ removeAllocationAsks(node, asks)
+}
diff --git a/pkg/scheduler/objects/quota_change_preemptor.go
b/pkg/scheduler/objects/quota_change_preemptor.go
index f277a3da..40401360 100644
--- a/pkg/scheduler/objects/quota_change_preemptor.go
+++ b/pkg/scheduler/objects/quota_change_preemptor.go
@@ -21,16 +21,23 @@ package objects
import (
"math"
+ "go.uber.org/zap"
+
"github.com/apache/yunikorn-core/pkg/common/resources"
+ "github.com/apache/yunikorn-core/pkg/log"
)
type QuotaChangePreemptionContext struct {
- queue *Queue
+ queue *Queue
+ preemptableResource *resources.Resource
+ allocations []*Allocation
}
func NewQuotaChangePreemptor(queue *Queue) *QuotaChangePreemptionContext {
preemptor := &QuotaChangePreemptionContext{
- queue: queue,
+ queue: queue,
+ preemptableResource: nil,
+ allocations: make([]*Allocation, 0),
}
return preemptor
}
@@ -49,16 +56,23 @@ func (qcp *QuotaChangePreemptionContext) tryPreemption() {
// quota change preemption has started, so mark the flag
qcp.queue.MarkQuotaChangePreemptionRunning()
- // Preemption logic goes here
+ // Get Preemptable Resource
+ qcp.preemptableResource = qcp.getPreemptableResources()
+
+ // Filter the allocations
+ qcp.allocations = qcp.filterAllocations()
+
+ // Sort the allocations
+ qcp.sortAllocations()
// quota change preemption has really evicted victims, so mark the flag
qcp.queue.MarkTriggerredQuotaChangePreemption()
}
-// GetPreemptableResources Get the preemptable resources for the queue
+// getPreemptableResources Get the preemptable resources for the queue
// Subtracting the usage from the max resource gives the preemptable resources.
// It could contain both positive and negative values. Only negative values
are preemptable.
-func (qcp *QuotaChangePreemptionContext) GetPreemptableResources()
*resources.Resource {
+func (qcp *QuotaChangePreemptionContext) getPreemptableResources()
*resources.Resource {
maxRes := qcp.queue.CloneMaxResource()
used := resources.SubOnlyExisting(qcp.queue.GetAllocatedResource(),
qcp.queue.GetPreemptingResource())
if maxRes.IsEmpty() || used.IsEmpty() {
@@ -77,3 +91,50 @@ func (qcp *QuotaChangePreemptionContext)
GetPreemptableResources() *resources.Re
}
return preemptableResource
}
+
+// filterAllocations Filter the allocations running in the queue suitable for
choosing as victims
+func (qcp *QuotaChangePreemptionContext) filterAllocations() []*Allocation {
+ if resources.IsZero(qcp.preemptableResource) {
+ return nil
+ }
+ var allocations []*Allocation
+ apps := qcp.queue.GetCopyOfApps()
+
+ // Traverse allocations running in the queue
+ for _, app := range apps {
+ appAllocations := app.GetAllAllocations()
+ for _, alloc := range appAllocations {
+ // at least one of a preemptable resource type should
match with a potential victim
+ if
!qcp.preemptableResource.MatchAny(alloc.GetAllocatedResource()) {
+ continue
+ }
+
+ // skip allocations which require a specific node
+ if alloc.GetRequiredNode() != "" {
+ continue
+ }
+
+ // skip already released allocations
+ if alloc.IsReleased() {
+ continue
+ }
+
+ // skip already preempted allocations
+ if alloc.IsPreempted() {
+ continue
+ }
+ allocations = append(allocations, alloc)
+ }
+ }
+ log.Log(log.ShedQuotaChangePreemption).Info("Filtering allocations",
+ zap.String("queue", qcp.queue.GetQueuePath()),
+ zap.Int("filtered allocations", len(allocations)),
+ )
+ return allocations
+}
+
+func (qcp *QuotaChangePreemptionContext) sortAllocations() {
+ if len(qcp.allocations) > 0 {
+ SortAllocations(qcp.allocations)
+ }
+}
diff --git a/pkg/scheduler/objects/quota_change_preemptor_test.go
b/pkg/scheduler/objects/quota_change_preemptor_test.go
index 7052c841..1159adf2 100644
--- a/pkg/scheduler/objects/quota_change_preemptor_test.go
+++ b/pkg/scheduler/objects/quota_change_preemptor_test.go
@@ -25,6 +25,7 @@ import (
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/resources"
+ "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
func TestQuotaChangeCheckPreconditions(t *testing.T) {
@@ -127,7 +128,61 @@ func TestQuotaChangeGetPreemptableResource(t *testing.T) {
tc.queue.maxResource = tc.maxResource
tc.queue.allocatedResource = tc.usedResource
preemptor := NewQuotaChangePreemptor(tc.queue)
- assert.Equal(t,
resources.Equals(preemptor.GetPreemptableResources(), tc.preemptable), true)
+ assert.Equal(t,
resources.Equals(preemptor.getPreemptableResources(), tc.preemptable), true)
+ })
+ }
+}
+
+func TestQuotaChangeFilterVictims(t *testing.T) {
+ leaf, err := NewConfiguredQueue(configs.QueueConfig{
+ Name: "leaf",
+ }, nil, false, nil)
+ assert.NilError(t, err)
+
+ node := NewNode(&si.NodeInfo{
+ NodeID: "node",
+ Attributes: nil,
+ SchedulableResource: &si.Resource{
+ Resources: map[string]*si.Quantity{"first": {Value:
100}},
+ },
+ })
+ testCases := []struct {
+ name string
+ queue *Queue
+ preemptableResource *resources.Resource
+ irrelevantAllocations []bool
+ expectedAllocationsCount int
+ }{
+ {"nil preemptable resource", leaf, nil, []bool{false, false,
false}, 0},
+ {"not even single res type in preemptable resource matches",
leaf, resources.NewResourceFromMap(map[string]resources.Quantity{"memory":
100}), []bool{false, false, false}, 0},
+ {"res type in preemptable resource matches", leaf,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 100}),
[]bool{false, false, false}, 10},
+ {"irrelevant - required node allocations", leaf,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 100}),
[]bool{true, false, false}, 8},
+ {"irrelevant - already preempted allocations", leaf,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 100}),
[]bool{false, true, false}, 8},
+ {"irrelevant - already released allocations", leaf,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 100}),
[]bool{false, false, true}, 8},
+ {"combine irrelevant allocations", leaf,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 100}),
[]bool{true, true, true}, 4},
+ }
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ asks := prepareAllocationAsks(t, node)
+ assignAllocationsToQueue(asks, leaf)
+ if tc.irrelevantAllocations[0] {
+ asks[0].SetRequiredNode("node2")
+ asks[1].SetRequiredNode("node2")
+ }
+ if tc.irrelevantAllocations[1] {
+ asks[2].MarkPreempted()
+ asks[3].MarkPreempted()
+ }
+ if tc.irrelevantAllocations[2] {
+ asks[4].SetReleased(true)
+ asks[5].SetReleased(true)
+ }
+ preemptor := NewQuotaChangePreemptor(tc.queue)
+ preemptor.preemptableResource = tc.preemptableResource
+ allocations := preemptor.filterAllocations()
+ assert.Equal(t, len(allocations),
tc.expectedAllocationsCount)
+ removeAllocationAsks(node, asks)
+ removeAllocationFromQueue(leaf)
})
}
}
diff --git a/pkg/scheduler/objects/required_node_preemptor.go
b/pkg/scheduler/objects/required_node_preemptor.go
index 6d111b47..b48895c6 100644
--- a/pkg/scheduler/objects/required_node_preemptor.go
+++ b/pkg/scheduler/objects/required_node_preemptor.go
@@ -19,8 +19,6 @@
package objects
import (
- "sort"
-
"github.com/apache/yunikorn-core/pkg/common/resources"
)
@@ -70,19 +68,12 @@ func (p *PreemptionContext) filterAllocations()
filteringResult {
continue
}
- // atleast one of the required ask resource should match,
otherwise skip
- includeAllocation := false
- for k := range p.requiredAsk.GetAllocatedResource().Resources {
- if _, ok :=
allocation.GetAllocatedResource().Resources[k]; ok {
- includeAllocation = true
- break
- }
- }
- if includeAllocation {
- p.allocations = append(p.allocations, allocation)
- } else {
+ // at least one of the required ask resource should match,
otherwise skip
+ if
!p.requiredAsk.GetAllocatedResource().MatchAny(allocation.GetAllocatedResource())
{
result.atLeastOneResNotMatched++
+ continue
}
+ p.allocations = append(p.allocations, allocation)
}
return result
@@ -94,54 +85,7 @@ func (p *PreemptionContext) filterAllocations()
filteringResult {
// 3. By Create time or age of the ask (younger ask placed first),
// 4. By resource (ask with lesser allocated resources placed first)
func (p *PreemptionContext) sortAllocations() {
- sort.SliceStable(p.allocations, func(i, j int) bool {
- l := p.allocations[i]
- r := p.allocations[j]
-
- // sort based on the type
- lAskType := 1 // regular pod
- if l.IsOriginator() { // driver/owner pod
- lAskType = 3
- } else if !l.IsAllowPreemptSelf() { // opted out pod
- lAskType = 2
- }
- rAskType := 1
- if r.IsOriginator() {
- rAskType = 3
- } else if !r.IsAllowPreemptSelf() {
- rAskType = 2
- }
- if lAskType < rAskType {
- return true
- }
- if lAskType > rAskType {
- return false
- }
-
- // sort based on the priority
- lPriority := l.GetPriority()
- rPriority := r.GetPriority()
- if lPriority < rPriority {
- return true
- }
- if lPriority > rPriority {
- return false
- }
-
- // sort based on the age
- if !l.GetCreateTime().Equal(r.GetCreateTime()) {
- return l.GetCreateTime().After(r.GetCreateTime())
- }
-
- // sort based on the allocated resource
- lResource := l.GetAllocatedResource()
- rResource := r.GetAllocatedResource()
- if !resources.Equals(lResource, rResource) {
- delta := resources.Sub(lResource, rResource)
- return !resources.StrictlyGreaterThanZero(delta)
- }
- return true
- })
+ SortAllocations(p.allocations)
}
func (p *PreemptionContext) GetVictims() []*Allocation {
diff --git a/pkg/scheduler/objects/required_node_preemptor_test.go
b/pkg/scheduler/objects/required_node_preemptor_test.go
index c35d8deb..21285e64 100644
--- a/pkg/scheduler/objects/required_node_preemptor_test.go
+++ b/pkg/scheduler/objects/required_node_preemptor_test.go
@@ -20,177 +20,13 @@ package objects
import (
"testing"
- "time"
"gotest.tools/v3/assert"
"github.com/apache/yunikorn-core/pkg/common/resources"
- siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
-func createAllocationAsk(allocationKey string, app string, allowPreemption
bool, isOriginator bool, priority int32, res *resources.Resource) *Allocation {
- tags := map[string]string{}
- siAsk := &si.Allocation{
- AllocationKey: allocationKey,
- ApplicationID: app,
- PartitionName: "default",
- Priority: priority,
- ResourcePerAlloc: res.ToProto(),
- Originator: isOriginator,
- PreemptionPolicy: &si.PreemptionPolicy{AllowPreemptSelf:
allowPreemption, AllowPreemptOther: true},
- AllocationTags: tags,
- }
- ask := NewAllocationFromSI(siAsk)
- return ask
-}
-
-func createAllocation(allocationKey string, app string, nodeID string,
allowPreemption bool, isOriginator bool, priority int32, requiredNode bool, res
*resources.Resource) *Allocation {
- tags := map[string]string{}
- siAsk := &si.Allocation{
- AllocationKey: allocationKey,
- ApplicationID: app,
- PartitionName: "default",
- Priority: priority,
- ResourcePerAlloc: res.ToProto(),
- Originator: isOriginator,
- PreemptionPolicy: &si.PreemptionPolicy{AllowPreemptSelf:
allowPreemption, AllowPreemptOther: true},
- AllocationTags: tags,
- NodeID: nodeID,
- }
- if requiredNode {
- tags[siCommon.DomainYuniKorn+siCommon.KeyRequiredNode] = nodeID
- }
- ask := NewAllocationFromSI(siAsk)
- return ask
-}
-
-func prepareAllocationAsks(t *testing.T, node *Node) []*Allocation {
- createTime := time.Now()
-
- result := make([]*Allocation, 0)
-
- // regular pods
- alloc1 := createAllocation("ask1", "app1", node.NodeID, true, false,
10, false,
-
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
- assert.Assert(t, node.TryAddAllocation(alloc1))
- result = append(result, alloc1)
-
- alloc2 := createAllocation("ask2", "app1", node.NodeID, true, false,
10, false,
-
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 8}))
- alloc2.createTime = createTime
- assert.Assert(t, node.TryAddAllocation(alloc2))
- result = append(result, alloc2)
-
- alloc3 := createAllocation("ask3", "app1", node.NodeID, true, false,
15, false,
-
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
- assert.Assert(t, node.TryAddAllocation(alloc3))
- result = append(result, alloc3)
-
- alloc4 := createAllocation("ask4", "app1", node.NodeID, true, false,
10, false,
-
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
- alloc4.createTime = createTime
- assert.Assert(t, node.TryAddAllocation(alloc4))
- result = append(result, alloc4)
-
- alloc5 := createAllocation("ask5", "app1", node.NodeID, true, false, 5,
false,
-
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
- assert.Assert(t, node.TryAddAllocation(alloc5))
- result = append(result, alloc5)
-
- // opted out pods
- alloc6 := createAllocation("ask6", "app1", node.NodeID, false, false,
10, false,
-
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
- assert.Assert(t, node.TryAddAllocation(alloc6))
- result = append(result, alloc6)
-
- alloc7 := createAllocation("ask7", "app1", node.NodeID, false, false,
10, false,
-
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 8}))
- alloc7.createTime = createTime
- assert.Assert(t, node.TryAddAllocation(alloc7))
- result = append(result, alloc7)
-
- alloc8 := createAllocation("ask8", "app1", node.NodeID, false, false,
15, false,
-
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
- assert.Assert(t, node.TryAddAllocation(alloc8))
- result = append(result, alloc8)
-
- // driver/owner pods
- alloc9 := createAllocation("ask9", "app1", node.NodeID, false, true,
10, false,
-
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
- alloc9.createTime = createTime
- assert.Assert(t, node.TryAddAllocation(alloc9))
- result = append(result, alloc9)
-
- alloc10 := createAllocation("ask10", "app1", node.NodeID, true, true,
5, false,
-
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
- assert.Assert(t, node.TryAddAllocation(alloc10))
- result = append(result, alloc10)
-
- return result
-}
-
-func removeAllocationAsks(node *Node, asks []*Allocation) {
- for _, ask := range asks {
- node.RemoveAllocation(ask.GetAllocationKey())
- }
-}
-
-// regular pods
-// ask1: pri - 10, create time - 1, res - 10
-// ask2: pri - 10, create time - 2, res - 8
-// ask3: pri - 15, create time - 3, res - 10
-// ask4: pri - 10, create time - 2, res - 5
-// ask5: pri - 5, create time - 4, res - 5
-
-// opted out pods
-// ask6: pri - 10, create time - 5, res - 10
-// ask7: pri - 10, create time - 2, res - 8
-// ask8: pri - 15, create time - 6, res - 10
-
-// driver/owner pods
-// ask9: pri - 10, create time - 2, res - 5
-// ask10: pri - 5, create time - 4, res - 5
-
-// original asks order: 6, 7, 8, 9, 10, 1, 2, 3, 4, 5
-// expected sorted asks o/p: 5, 1, 4, 2, 3, 6, 7, 8, 10, 9
-func TestSortAllocations(t *testing.T) {
- node := NewNode(&si.NodeInfo{
- NodeID: "node",
- Attributes: nil,
- SchedulableResource: &si.Resource{
- Resources: map[string]*si.Quantity{"first": {Value:
100}},
- },
- })
-
- requiredAsk := createAllocationAsk("ask", "app1", true, true, 20,
-
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
-
- p := NewRequiredNodePreemptor(node, requiredAsk)
- asks := prepareAllocationAsks(t, node)
- p.filterAllocations()
- p.sortAllocations()
- sortedAsks := p.getAllocations()
-
- // assert regular pods
- assert.Equal(t, sortedAsks[0].GetAllocationKey(), "ask5")
- assert.Equal(t, sortedAsks[1].GetAllocationKey(), "ask1")
- assert.Equal(t, sortedAsks[2].GetAllocationKey(), "ask4")
- assert.Equal(t, sortedAsks[3].GetAllocationKey(), "ask2")
- assert.Equal(t, sortedAsks[4].GetAllocationKey(), "ask3")
-
- // assert opted out pods
- assert.Equal(t, sortedAsks[5].GetAllocationKey(), "ask6")
- assert.Equal(t, sortedAsks[6].GetAllocationKey(), "ask7")
- assert.Equal(t, sortedAsks[7].GetAllocationKey(), "ask8")
-
- // assert driver/owner pods
- assert.Equal(t, sortedAsks[8].GetAllocationKey(), "ask10")
- assert.Equal(t, sortedAsks[9].GetAllocationKey(), "ask9")
-
- removeAllocationAsks(node, asks)
-}
-
func TestFilterAllocations(t *testing.T) {
node := NewNode(&si.NodeInfo{
NodeID: "node",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]