This is an automated email from the ASF dual-hosted git repository.

mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new caec1094 [YUNIKORN-3143] Victim ordering and selection (#1040)
caec1094 is described below

commit caec10943d783ee438686a4df6560e8168f235b8
Author: mani <[email protected]>
AuthorDate: Mon Nov 10 12:24:33 2025 +0530

    [YUNIKORN-3143] Victim ordering and selection (#1040)
    
    Closes: #1040
    
    Signed-off-by: mani <[email protected]>
---
 pkg/log/logger.go                                  |  61 +++---
 pkg/log/logger_test.go                             |   2 +-
 pkg/scheduler/objects/preemption_utilities.go      |  81 ++++++++
 pkg/scheduler/objects/preemption_utilities_test.go | 206 +++++++++++++++++++++
 pkg/scheduler/objects/quota_change_preemptor.go    |  71 ++++++-
 .../objects/quota_change_preemptor_test.go         |  57 +++++-
 pkg/scheduler/objects/required_node_preemptor.go   |  66 +------
 .../objects/required_node_preemptor_test.go        | 164 ----------------
 8 files changed, 446 insertions(+), 262 deletions(-)

diff --git a/pkg/log/logger.go b/pkg/log/logger.go
index b902556d..6ef29fe9 100644
--- a/pkg/log/logger.go
+++ b/pkg/log/logger.go
@@ -54,42 +54,43 @@ const (
 
 // Defined loggers: when adding new loggers, ids must be sequential, and all 
must be added to the loggers slice in the same order
 var (
-       Core             = &LoggerHandle{id: 0, name: "core"}
-       Test             = &LoggerHandle{id: 1, name: "test"}
-       Deprecation      = &LoggerHandle{id: 2, name: "deprecation"}
-       Config           = &LoggerHandle{id: 3, name: "core.config"}
-       Entrypoint       = &LoggerHandle{id: 4, name: "core.entrypoint"}
-       Events           = &LoggerHandle{id: 5, name: "core.events"}
-       OpenTracing      = &LoggerHandle{id: 6, name: "core.opentracing"}
-       Resources        = &LoggerHandle{id: 7, name: "core.resources"}
-       REST             = &LoggerHandle{id: 8, name: "core.rest"}
-       RMProxy          = &LoggerHandle{id: 9, name: "core.rmproxy"}
-       RPC              = &LoggerHandle{id: 10, name: "core.rpc"}
-       Metrics          = &LoggerHandle{id: 11, name: "core.metrics"}
-       Scheduler        = &LoggerHandle{id: 12, name: "core.scheduler"}
-       SchedAllocation  = &LoggerHandle{id: 13, name: 
"core.scheduler.allocation"}
-       SchedApplication = &LoggerHandle{id: 14, name: 
"core.scheduler.application"}
-       SchedAppUsage    = &LoggerHandle{id: 15, name: 
"core.scheduler.application.usage"}
-       SchedContext     = &LoggerHandle{id: 16, name: "core.scheduler.context"}
-       SchedFSM         = &LoggerHandle{id: 17, name: "core.scheduler.fsm"}
-       SchedHealth      = &LoggerHandle{id: 18, name: "core.scheduler.health"}
-       SchedNode        = &LoggerHandle{id: 19, name: "core.scheduler.node"}
-       SchedPartition   = &LoggerHandle{id: 20, name: 
"core.scheduler.partition"}
-       SchedPreemption  = &LoggerHandle{id: 21, name: 
"core.scheduler.preemption"}
-       SchedQueue       = &LoggerHandle{id: 22, name: "core.scheduler.queue"}
-       SchedReservation = &LoggerHandle{id: 23, name: 
"core.scheduler.reservation"}
-       SchedUGM         = &LoggerHandle{id: 24, name: "core.scheduler.ugm"}
-       SchedNodesUsage  = &LoggerHandle{id: 25, name: 
"core.scheduler.nodesusage"}
-       Security         = &LoggerHandle{id: 26, name: "core.security"}
-       Utils            = &LoggerHandle{id: 27, name: "core.utils"}
-       Diagnostics      = &LoggerHandle{id: 28, name: "core.diagnostics"}
+       Core                      = &LoggerHandle{id: 0, name: "core"}
+       Test                      = &LoggerHandle{id: 1, name: "test"}
+       Deprecation               = &LoggerHandle{id: 2, name: "deprecation"}
+       Config                    = &LoggerHandle{id: 3, name: "core.config"}
+       Entrypoint                = &LoggerHandle{id: 4, name: 
"core.entrypoint"}
+       Events                    = &LoggerHandle{id: 5, name: "core.events"}
+       OpenTracing               = &LoggerHandle{id: 6, name: 
"core.opentracing"}
+       Resources                 = &LoggerHandle{id: 7, name: "core.resources"}
+       REST                      = &LoggerHandle{id: 8, name: "core.rest"}
+       RMProxy                   = &LoggerHandle{id: 9, name: "core.rmproxy"}
+       RPC                       = &LoggerHandle{id: 10, name: "core.rpc"}
+       Metrics                   = &LoggerHandle{id: 11, name: "core.metrics"}
+       Scheduler                 = &LoggerHandle{id: 12, name: 
"core.scheduler"}
+       SchedAllocation           = &LoggerHandle{id: 13, name: 
"core.scheduler.allocation"}
+       SchedApplication          = &LoggerHandle{id: 14, name: 
"core.scheduler.application"}
+       SchedAppUsage             = &LoggerHandle{id: 15, name: 
"core.scheduler.application.usage"}
+       SchedContext              = &LoggerHandle{id: 16, name: 
"core.scheduler.context"}
+       SchedFSM                  = &LoggerHandle{id: 17, name: 
"core.scheduler.fsm"}
+       SchedHealth               = &LoggerHandle{id: 18, name: 
"core.scheduler.health"}
+       SchedNode                 = &LoggerHandle{id: 19, name: 
"core.scheduler.node"}
+       SchedPartition            = &LoggerHandle{id: 20, name: 
"core.scheduler.partition"}
+       SchedPreemption           = &LoggerHandle{id: 21, name: 
"core.scheduler.preemption"}
+       SchedQueue                = &LoggerHandle{id: 22, name: 
"core.scheduler.queue"}
+       SchedReservation          = &LoggerHandle{id: 23, name: 
"core.scheduler.reservation"}
+       SchedUGM                  = &LoggerHandle{id: 24, name: 
"core.scheduler.ugm"}
+       SchedNodesUsage           = &LoggerHandle{id: 25, name: 
"core.scheduler.nodesusage"}
+       Security                  = &LoggerHandle{id: 26, name: "core.security"}
+       Utils                     = &LoggerHandle{id: 27, name: "core.utils"}
+       Diagnostics               = &LoggerHandle{id: 28, name: 
"core.diagnostics"}
+       ShedQuotaChangePreemption = &LoggerHandle{id: 29, name: 
"core.scheduler.preemption.quotachange"}
 )
 
 // this tracks all the known logger handles, used to preallocate the real 
logger instances when configuration changes
 var loggers = []*LoggerHandle{
        Core, Test, Deprecation, Config, Entrypoint, Events, OpenTracing, 
Resources, REST, RMProxy, RPC, Metrics,
        Scheduler, SchedAllocation, SchedApplication, SchedAppUsage, 
SchedContext, SchedFSM, SchedHealth, SchedNode,
-       SchedPartition, SchedPreemption, SchedQueue, SchedReservation, 
SchedUGM, SchedNodesUsage, Security, Utils, Diagnostics,
+       SchedPartition, SchedPreemption, SchedQueue, SchedReservation, 
SchedUGM, SchedNodesUsage, Security, Utils, Diagnostics, 
ShedQuotaChangePreemption,
 }
 
 // structure to hold all current logger configuration state
diff --git a/pkg/log/logger_test.go b/pkg/log/logger_test.go
index 82e31f92..344ae892 100644
--- a/pkg/log/logger_test.go
+++ b/pkg/log/logger_test.go
@@ -39,7 +39,7 @@ func TestLoggerIDs(t *testing.T) {
        _ = Log(Test)
 
        // validate logger count
-       assert.Equal(t, 29, len(loggers), "wrong logger count")
+       assert.Equal(t, 30, len(loggers), "wrong logger count")
 
        // validate that all loggers are populated and have sequential ids
        for i := 0; i < len(loggers); i++ {
diff --git a/pkg/scheduler/objects/preemption_utilities.go 
b/pkg/scheduler/objects/preemption_utilities.go
new file mode 100644
index 00000000..c522ed68
--- /dev/null
+++ b/pkg/scheduler/objects/preemption_utilities.go
@@ -0,0 +1,81 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+       "sort"
+
+       "github.com/apache/yunikorn-core/pkg/common/resources"
+)
+
+// SortAllocations Sort allocations based on the following criteria in the 
specified order:
+// 1. By type (regular pods, opted out pods, driver/owner pods),
+// 2. By priority (least priority ask placed first),
+// 3. By Create time or age of the ask (younger ask placed first),
+// 4. By resource (ask with lesser allocated resources placed first)
+func SortAllocations(allocations []*Allocation) {
+       sort.SliceStable(allocations, func(i, j int) bool {
+               l := allocations[i]
+               r := allocations[j]
+
+               // sort based on the type
+               lAskType := 1         // regular pod
+               if l.IsOriginator() { // driver/owner pod
+                       lAskType = 3
+               } else if !l.IsAllowPreemptSelf() { // opted out pod
+                       lAskType = 2
+               }
+               rAskType := 1
+               if r.IsOriginator() {
+                       rAskType = 3
+               } else if !r.IsAllowPreemptSelf() {
+                       rAskType = 2
+               }
+               if lAskType < rAskType {
+                       return true
+               }
+               if lAskType > rAskType {
+                       return false
+               }
+
+               // sort based on the priority
+               lPriority := l.GetPriority()
+               rPriority := r.GetPriority()
+               if lPriority < rPriority {
+                       return true
+               }
+               if lPriority > rPriority {
+                       return false
+               }
+
+               // sort based on the age
+               if !l.GetCreateTime().Equal(r.GetCreateTime()) {
+                       return l.GetCreateTime().After(r.GetCreateTime())
+               }
+
+               // sort based on the allocated resource
+               lResource := l.GetAllocatedResource()
+               rResource := r.GetAllocatedResource()
+               if !resources.Equals(lResource, rResource) {
+                       delta := resources.Sub(lResource, rResource)
+                       return !resources.StrictlyGreaterThanZero(delta)
+               }
+               return true
+       })
+}
diff --git a/pkg/scheduler/objects/preemption_utilities_test.go 
b/pkg/scheduler/objects/preemption_utilities_test.go
new file mode 100644
index 00000000..46bd5d47
--- /dev/null
+++ b/pkg/scheduler/objects/preemption_utilities_test.go
@@ -0,0 +1,206 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+       "testing"
+       "time"
+
+       "gotest.tools/v3/assert"
+
+       "github.com/apache/yunikorn-core/pkg/common/resources"
+       siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
+       "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+func createAllocationAsk(allocationKey string, app string, allowPreemption 
bool, isOriginator bool, priority int32, res *resources.Resource) *Allocation {
+       tags := map[string]string{}
+       siAsk := &si.Allocation{
+               AllocationKey:    allocationKey,
+               ApplicationID:    app,
+               PartitionName:    "default",
+               Priority:         priority,
+               ResourcePerAlloc: res.ToProto(),
+               Originator:       isOriginator,
+               PreemptionPolicy: &si.PreemptionPolicy{AllowPreemptSelf: 
allowPreemption, AllowPreemptOther: true},
+               AllocationTags:   tags,
+       }
+       ask := NewAllocationFromSI(siAsk)
+       return ask
+}
+
+func createAllocation(allocationKey string, app string, nodeID string, 
allowPreemption bool, isOriginator bool, priority int32, requiredNode bool, res 
*resources.Resource) *Allocation {
+       tags := map[string]string{}
+       siAsk := &si.Allocation{
+               AllocationKey:    allocationKey,
+               ApplicationID:    app,
+               PartitionName:    "default",
+               Priority:         priority,
+               ResourcePerAlloc: res.ToProto(),
+               Originator:       isOriginator,
+               PreemptionPolicy: &si.PreemptionPolicy{AllowPreemptSelf: 
allowPreemption, AllowPreemptOther: true},
+               AllocationTags:   tags,
+               NodeID:           nodeID,
+       }
+       if requiredNode {
+               tags[siCommon.DomainYuniKorn+siCommon.KeyRequiredNode] = nodeID
+       }
+       ask := NewAllocationFromSI(siAsk)
+       return ask
+}
+
+func prepareAllocationAsks(t *testing.T, node *Node) []*Allocation {
+       createTime := time.Now()
+
+       result := make([]*Allocation, 0)
+
+       // regular pods
+       alloc1 := createAllocation("ask1", "app1", node.NodeID, true, false, 
10, false,
+               
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
+       assert.Assert(t, node.TryAddAllocation(alloc1))
+       result = append(result, alloc1)
+
+       alloc2 := createAllocation("ask2", "app1", node.NodeID, true, false, 
10, false,
+               
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 8}))
+       alloc2.createTime = createTime
+       assert.Assert(t, node.TryAddAllocation(alloc2))
+       result = append(result, alloc2)
+
+       alloc3 := createAllocation("ask3", "app1", node.NodeID, true, false, 
15, false,
+               
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
+       assert.Assert(t, node.TryAddAllocation(alloc3))
+       result = append(result, alloc3)
+
+       alloc4 := createAllocation("ask4", "app1", node.NodeID, true, false, 
10, false,
+               
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
+       alloc4.createTime = createTime
+       assert.Assert(t, node.TryAddAllocation(alloc4))
+       result = append(result, alloc4)
+
+       alloc5 := createAllocation("ask5", "app1", node.NodeID, true, false, 5, 
false,
+               
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
+       assert.Assert(t, node.TryAddAllocation(alloc5))
+       result = append(result, alloc5)
+
+       // opted out pods
+       alloc6 := createAllocation("ask6", "app1", node.NodeID, false, false, 
10, false,
+               
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
+       assert.Assert(t, node.TryAddAllocation(alloc6))
+       result = append(result, alloc6)
+
+       alloc7 := createAllocation("ask7", "app1", node.NodeID, false, false, 
10, false,
+               
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 8}))
+       alloc7.createTime = createTime
+       assert.Assert(t, node.TryAddAllocation(alloc7))
+       result = append(result, alloc7)
+
+       alloc8 := createAllocation("ask8", "app1", node.NodeID, false, false, 
15, false,
+               
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
+       assert.Assert(t, node.TryAddAllocation(alloc8))
+       result = append(result, alloc8)
+
+       // driver/owner pods
+       alloc9 := createAllocation("ask9", "app1", node.NodeID, false, true, 
10, false,
+               
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
+       alloc9.createTime = createTime
+       assert.Assert(t, node.TryAddAllocation(alloc9))
+       result = append(result, alloc9)
+
+       alloc10 := createAllocation("ask10", "app1", node.NodeID, true, true, 
5, false,
+               
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
+       assert.Assert(t, node.TryAddAllocation(alloc10))
+       result = append(result, alloc10)
+
+       return result
+}
+
+func removeAllocationAsks(node *Node, asks []*Allocation) {
+       for _, ask := range asks {
+               node.RemoveAllocation(ask.GetAllocationKey())
+       }
+}
+
+func assignAllocationsToQueue(allocations []*Allocation, queue *Queue) {
+       for _, allocation := range allocations {
+               var app *Application
+               var ok bool
+               if _, ok = queue.applications[allocation.applicationID]; !ok {
+                       app = newApplication(allocation.applicationID, 
"default", queue.QueuePath)
+                       app.SetQueue(queue)
+                       queue.applications[allocation.applicationID] = app
+               } else {
+                       app = queue.applications[allocation.applicationID]
+               }
+               app.AddAllocation(allocation)
+       }
+}
+
+func removeAllocationFromQueue(queue *Queue) {
+       queue.applications = make(map[string]*Application)
+}
+
+// regular pods
+// ask1: pri - 10, create time - 1, res - 10
+// ask2: pri - 10, create time - 2, res - 8
+// ask3: pri - 15, create time - 3, res - 10
+// ask4: pri - 10, create time - 2, res - 5
+// ask5: pri - 5, create time - 4, res - 5
+
+// opted out pods
+// ask6: pri - 10, create time - 5, res - 10
+// ask7: pri - 10, create time - 2, res - 8
+// ask8: pri - 15, create time - 6, res - 10
+
+// driver/owner pods
+// ask9: pri - 10, create time - 2, res - 5
+// ask10: pri - 5, create time - 4, res - 5
+
+// original asks order: 6, 7, 8, 9, 10, 1, 2, 3, 4, 5
+// expected sorted asks o/p: 5, 1, 4, 2, 3, 6, 7, 8, 10, 9
+func TestSortAllocations(t *testing.T) {
+       node := NewNode(&si.NodeInfo{
+               NodeID:     "node",
+               Attributes: nil,
+               SchedulableResource: &si.Resource{
+                       Resources: map[string]*si.Quantity{"first": {Value: 
100}},
+               },
+       })
+
+       asks := prepareAllocationAsks(t, node)
+       SortAllocations(asks)
+       sortedAsks := asks
+
+       // assert regular pods
+       assert.Equal(t, sortedAsks[0].GetAllocationKey(), "ask5")
+       assert.Equal(t, sortedAsks[1].GetAllocationKey(), "ask1")
+       assert.Equal(t, sortedAsks[2].GetAllocationKey(), "ask4")
+       assert.Equal(t, sortedAsks[3].GetAllocationKey(), "ask2")
+       assert.Equal(t, sortedAsks[4].GetAllocationKey(), "ask3")
+
+       // assert opted out pods
+       assert.Equal(t, sortedAsks[5].GetAllocationKey(), "ask6")
+       assert.Equal(t, sortedAsks[6].GetAllocationKey(), "ask7")
+       assert.Equal(t, sortedAsks[7].GetAllocationKey(), "ask8")
+
+       // assert driver/owner pods
+       assert.Equal(t, sortedAsks[8].GetAllocationKey(), "ask10")
+       assert.Equal(t, sortedAsks[9].GetAllocationKey(), "ask9")
+
+       removeAllocationAsks(node, asks)
+}
diff --git a/pkg/scheduler/objects/quota_change_preemptor.go 
b/pkg/scheduler/objects/quota_change_preemptor.go
index f277a3da..40401360 100644
--- a/pkg/scheduler/objects/quota_change_preemptor.go
+++ b/pkg/scheduler/objects/quota_change_preemptor.go
@@ -21,16 +21,23 @@ package objects
 import (
        "math"
 
+       "go.uber.org/zap"
+
        "github.com/apache/yunikorn-core/pkg/common/resources"
+       "github.com/apache/yunikorn-core/pkg/log"
 )
 
 type QuotaChangePreemptionContext struct {
-       queue *Queue
+       queue               *Queue
+       preemptableResource *resources.Resource
+       allocations         []*Allocation
 }
 
 func NewQuotaChangePreemptor(queue *Queue) *QuotaChangePreemptionContext {
        preemptor := &QuotaChangePreemptionContext{
-               queue: queue,
+               queue:               queue,
+               preemptableResource: nil,
+               allocations:         make([]*Allocation, 0),
        }
        return preemptor
 }
@@ -49,16 +56,23 @@ func (qcp *QuotaChangePreemptionContext) tryPreemption() {
        // quota change preemption has started, so mark the flag
        qcp.queue.MarkQuotaChangePreemptionRunning()
 
-       // Preemption logic goes here
+       // Get Preemptable Resource
+       qcp.preemptableResource = qcp.getPreemptableResources()
+
+       // Filter the allocations
+       qcp.allocations = qcp.filterAllocations()
+
+       // Sort the allocations
+       qcp.sortAllocations()
 
        // quota change preemption has really evicted victims, so mark the flag
        qcp.queue.MarkTriggerredQuotaChangePreemption()
 }
 
-// GetPreemptableResources Get the preemptable resources for the queue
+// getPreemptableResources Get the preemptable resources for the queue
 // Subtracting the usage from the max resource gives the preemptable resources.
 // It could contain both positive and negative values. Only negative values 
are preemptable.
-func (qcp *QuotaChangePreemptionContext) GetPreemptableResources() 
*resources.Resource {
+func (qcp *QuotaChangePreemptionContext) getPreemptableResources() 
*resources.Resource {
        maxRes := qcp.queue.CloneMaxResource()
        used := resources.SubOnlyExisting(qcp.queue.GetAllocatedResource(), 
qcp.queue.GetPreemptingResource())
        if maxRes.IsEmpty() || used.IsEmpty() {
@@ -77,3 +91,50 @@ func (qcp *QuotaChangePreemptionContext) 
GetPreemptableResources() *resources.Re
        }
        return preemptableResource
 }
+
+// filterAllocations Filter the allocations running in the queue suitable for 
choosing as victims
+func (qcp *QuotaChangePreemptionContext) filterAllocations() []*Allocation {
+       if resources.IsZero(qcp.preemptableResource) {
+               return nil
+       }
+       var allocations []*Allocation
+       apps := qcp.queue.GetCopyOfApps()
+
+       // Traverse allocations running in the queue
+       for _, app := range apps {
+               appAllocations := app.GetAllAllocations()
+               for _, alloc := range appAllocations {
+                       // at least one of a preemptable resource type should 
match with a potential victim
+                       if 
!qcp.preemptableResource.MatchAny(alloc.GetAllocatedResource()) {
+                               continue
+                       }
+
+                       // skip allocations which require a specific node
+                       if alloc.GetRequiredNode() != "" {
+                               continue
+                       }
+
+                       // skip already released allocations
+                       if alloc.IsReleased() {
+                               continue
+                       }
+
+                       // skip already preempted allocations
+                       if alloc.IsPreempted() {
+                               continue
+                       }
+                       allocations = append(allocations, alloc)
+               }
+       }
+       log.Log(log.ShedQuotaChangePreemption).Info("Filtering allocations",
+               zap.String("queue", qcp.queue.GetQueuePath()),
+               zap.Int("filtered allocations", len(allocations)),
+       )
+       return allocations
+}
+
+func (qcp *QuotaChangePreemptionContext) sortAllocations() {
+       if len(qcp.allocations) > 0 {
+               SortAllocations(qcp.allocations)
+       }
+}
diff --git a/pkg/scheduler/objects/quota_change_preemptor_test.go 
b/pkg/scheduler/objects/quota_change_preemptor_test.go
index 7052c841..1159adf2 100644
--- a/pkg/scheduler/objects/quota_change_preemptor_test.go
+++ b/pkg/scheduler/objects/quota_change_preemptor_test.go
@@ -25,6 +25,7 @@ import (
 
        "github.com/apache/yunikorn-core/pkg/common/configs"
        "github.com/apache/yunikorn-core/pkg/common/resources"
+       "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
 )
 
 func TestQuotaChangeCheckPreconditions(t *testing.T) {
@@ -127,7 +128,61 @@ func TestQuotaChangeGetPreemptableResource(t *testing.T) {
                        tc.queue.maxResource = tc.maxResource
                        tc.queue.allocatedResource = tc.usedResource
                        preemptor := NewQuotaChangePreemptor(tc.queue)
-                       assert.Equal(t, 
resources.Equals(preemptor.GetPreemptableResources(), tc.preemptable), true)
+                       assert.Equal(t, 
resources.Equals(preemptor.getPreemptableResources(), tc.preemptable), true)
+               })
+       }
+}
+
+func TestQuotaChangeFilterVictims(t *testing.T) {
+       leaf, err := NewConfiguredQueue(configs.QueueConfig{
+               Name: "leaf",
+       }, nil, false, nil)
+       assert.NilError(t, err)
+
+       node := NewNode(&si.NodeInfo{
+               NodeID:     "node",
+               Attributes: nil,
+               SchedulableResource: &si.Resource{
+                       Resources: map[string]*si.Quantity{"first": {Value: 
100}},
+               },
+       })
+       testCases := []struct {
+               name                     string
+               queue                    *Queue
+               preemptableResource      *resources.Resource
+               irrelevantAllocations    []bool
+               expectedAllocationsCount int
+       }{
+               {"nil preemptable resource", leaf, nil, []bool{false, false, 
false}, 0},
+               {"not even single res type in preemptable resource matches", 
leaf, resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 
100}), []bool{false, false, false}, 0},
+               {"res type in preemptable resource matches", leaf, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 100}), 
[]bool{false, false, false}, 10},
+               {"irrelevant - required node allocations", leaf, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 100}), 
[]bool{true, false, false}, 8},
+               {"irrelevant - already preempted allocations", leaf, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 100}), 
[]bool{false, true, false}, 8},
+               {"irrelevant - already released allocations", leaf, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 100}), 
[]bool{false, false, true}, 8},
+               {"combine irrelevant allocations", leaf, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 100}), 
[]bool{true, true, true}, 4},
+       }
+       for _, tc := range testCases {
+               t.Run(tc.name, func(t *testing.T) {
+                       asks := prepareAllocationAsks(t, node)
+                       assignAllocationsToQueue(asks, leaf)
+                       if tc.irrelevantAllocations[0] {
+                               asks[0].SetRequiredNode("node2")
+                               asks[1].SetRequiredNode("node2")
+                       }
+                       if tc.irrelevantAllocations[1] {
+                               asks[2].MarkPreempted()
+                               asks[3].MarkPreempted()
+                       }
+                       if tc.irrelevantAllocations[2] {
+                               asks[4].SetReleased(true)
+                               asks[5].SetReleased(true)
+                       }
+                       preemptor := NewQuotaChangePreemptor(tc.queue)
+                       preemptor.preemptableResource = tc.preemptableResource
+                       allocations := preemptor.filterAllocations()
+                       assert.Equal(t, len(allocations), 
tc.expectedAllocationsCount)
+                       removeAllocationAsks(node, asks)
+                       removeAllocationFromQueue(leaf)
                })
        }
 }
diff --git a/pkg/scheduler/objects/required_node_preemptor.go 
b/pkg/scheduler/objects/required_node_preemptor.go
index 6d111b47..b48895c6 100644
--- a/pkg/scheduler/objects/required_node_preemptor.go
+++ b/pkg/scheduler/objects/required_node_preemptor.go
@@ -19,8 +19,6 @@
 package objects
 
 import (
-       "sort"
-
        "github.com/apache/yunikorn-core/pkg/common/resources"
 )
 
@@ -70,19 +68,12 @@ func (p *PreemptionContext) filterAllocations() 
filteringResult {
                        continue
                }
 
-               // atleast one of the required ask resource should match, 
otherwise skip
-               includeAllocation := false
-               for k := range p.requiredAsk.GetAllocatedResource().Resources {
-                       if _, ok := 
allocation.GetAllocatedResource().Resources[k]; ok {
-                               includeAllocation = true
-                               break
-                       }
-               }
-               if includeAllocation {
-                       p.allocations = append(p.allocations, allocation)
-               } else {
+               // at least one of the required ask resource should match, 
otherwise skip
+               if 
!p.requiredAsk.GetAllocatedResource().MatchAny(allocation.GetAllocatedResource())
 {
                        result.atLeastOneResNotMatched++
+                       continue
                }
+               p.allocations = append(p.allocations, allocation)
        }
 
        return result
@@ -94,54 +85,7 @@ func (p *PreemptionContext) filterAllocations() 
filteringResult {
 // 3. By Create time or age of the ask (younger ask placed first),
 // 4. By resource (ask with lesser allocated resources placed first)
 func (p *PreemptionContext) sortAllocations() {
-       sort.SliceStable(p.allocations, func(i, j int) bool {
-               l := p.allocations[i]
-               r := p.allocations[j]
-
-               // sort based on the type
-               lAskType := 1         // regular pod
-               if l.IsOriginator() { // driver/owner pod
-                       lAskType = 3
-               } else if !l.IsAllowPreemptSelf() { // opted out pod
-                       lAskType = 2
-               }
-               rAskType := 1
-               if r.IsOriginator() {
-                       rAskType = 3
-               } else if !r.IsAllowPreemptSelf() {
-                       rAskType = 2
-               }
-               if lAskType < rAskType {
-                       return true
-               }
-               if lAskType > rAskType {
-                       return false
-               }
-
-               // sort based on the priority
-               lPriority := l.GetPriority()
-               rPriority := r.GetPriority()
-               if lPriority < rPriority {
-                       return true
-               }
-               if lPriority > rPriority {
-                       return false
-               }
-
-               // sort based on the age
-               if !l.GetCreateTime().Equal(r.GetCreateTime()) {
-                       return l.GetCreateTime().After(r.GetCreateTime())
-               }
-
-               // sort based on the allocated resource
-               lResource := l.GetAllocatedResource()
-               rResource := r.GetAllocatedResource()
-               if !resources.Equals(lResource, rResource) {
-                       delta := resources.Sub(lResource, rResource)
-                       return !resources.StrictlyGreaterThanZero(delta)
-               }
-               return true
-       })
+       SortAllocations(p.allocations)
 }
 
 func (p *PreemptionContext) GetVictims() []*Allocation {
diff --git a/pkg/scheduler/objects/required_node_preemptor_test.go 
b/pkg/scheduler/objects/required_node_preemptor_test.go
index c35d8deb..21285e64 100644
--- a/pkg/scheduler/objects/required_node_preemptor_test.go
+++ b/pkg/scheduler/objects/required_node_preemptor_test.go
@@ -20,177 +20,13 @@ package objects
 
 import (
        "testing"
-       "time"
 
        "gotest.tools/v3/assert"
 
        "github.com/apache/yunikorn-core/pkg/common/resources"
-       siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
        "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
 )
 
-func createAllocationAsk(allocationKey string, app string, allowPreemption 
bool, isOriginator bool, priority int32, res *resources.Resource) *Allocation {
-       tags := map[string]string{}
-       siAsk := &si.Allocation{
-               AllocationKey:    allocationKey,
-               ApplicationID:    app,
-               PartitionName:    "default",
-               Priority:         priority,
-               ResourcePerAlloc: res.ToProto(),
-               Originator:       isOriginator,
-               PreemptionPolicy: &si.PreemptionPolicy{AllowPreemptSelf: 
allowPreemption, AllowPreemptOther: true},
-               AllocationTags:   tags,
-       }
-       ask := NewAllocationFromSI(siAsk)
-       return ask
-}
-
-func createAllocation(allocationKey string, app string, nodeID string, 
allowPreemption bool, isOriginator bool, priority int32, requiredNode bool, res 
*resources.Resource) *Allocation {
-       tags := map[string]string{}
-       siAsk := &si.Allocation{
-               AllocationKey:    allocationKey,
-               ApplicationID:    app,
-               PartitionName:    "default",
-               Priority:         priority,
-               ResourcePerAlloc: res.ToProto(),
-               Originator:       isOriginator,
-               PreemptionPolicy: &si.PreemptionPolicy{AllowPreemptSelf: 
allowPreemption, AllowPreemptOther: true},
-               AllocationTags:   tags,
-               NodeID:           nodeID,
-       }
-       if requiredNode {
-               tags[siCommon.DomainYuniKorn+siCommon.KeyRequiredNode] = nodeID
-       }
-       ask := NewAllocationFromSI(siAsk)
-       return ask
-}
-
-func prepareAllocationAsks(t *testing.T, node *Node) []*Allocation {
-       createTime := time.Now()
-
-       result := make([]*Allocation, 0)
-
-       // regular pods
-       alloc1 := createAllocation("ask1", "app1", node.NodeID, true, false, 
10, false,
-               
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
-       assert.Assert(t, node.TryAddAllocation(alloc1))
-       result = append(result, alloc1)
-
-       alloc2 := createAllocation("ask2", "app1", node.NodeID, true, false, 
10, false,
-               
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 8}))
-       alloc2.createTime = createTime
-       assert.Assert(t, node.TryAddAllocation(alloc2))
-       result = append(result, alloc2)
-
-       alloc3 := createAllocation("ask3", "app1", node.NodeID, true, false, 
15, false,
-               
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
-       assert.Assert(t, node.TryAddAllocation(alloc3))
-       result = append(result, alloc3)
-
-       alloc4 := createAllocation("ask4", "app1", node.NodeID, true, false, 
10, false,
-               
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
-       alloc4.createTime = createTime
-       assert.Assert(t, node.TryAddAllocation(alloc4))
-       result = append(result, alloc4)
-
-       alloc5 := createAllocation("ask5", "app1", node.NodeID, true, false, 5, 
false,
-               
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
-       assert.Assert(t, node.TryAddAllocation(alloc5))
-       result = append(result, alloc5)
-
-       // opted out pods
-       alloc6 := createAllocation("ask6", "app1", node.NodeID, false, false, 
10, false,
-               
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
-       assert.Assert(t, node.TryAddAllocation(alloc6))
-       result = append(result, alloc6)
-
-       alloc7 := createAllocation("ask7", "app1", node.NodeID, false, false, 
10, false,
-               
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 8}))
-       alloc7.createTime = createTime
-       assert.Assert(t, node.TryAddAllocation(alloc7))
-       result = append(result, alloc7)
-
-       alloc8 := createAllocation("ask8", "app1", node.NodeID, false, false, 
15, false,
-               
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}))
-       assert.Assert(t, node.TryAddAllocation(alloc8))
-       result = append(result, alloc8)
-
-       // driver/owner pods
-       alloc9 := createAllocation("ask9", "app1", node.NodeID, false, true, 
10, false,
-               
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
-       alloc9.createTime = createTime
-       assert.Assert(t, node.TryAddAllocation(alloc9))
-       result = append(result, alloc9)
-
-       alloc10 := createAllocation("ask10", "app1", node.NodeID, true, true, 
5, false,
-               
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
-       assert.Assert(t, node.TryAddAllocation(alloc10))
-       result = append(result, alloc10)
-
-       return result
-}
-
-func removeAllocationAsks(node *Node, asks []*Allocation) {
-       for _, ask := range asks {
-               node.RemoveAllocation(ask.GetAllocationKey())
-       }
-}
-
-// regular pods
-// ask1: pri - 10, create time - 1, res - 10
-// ask2: pri - 10, create time - 2, res - 8
-// ask3: pri - 15, create time - 3, res - 10
-// ask4: pri - 10, create time - 2, res - 5
-// ask5: pri - 5, create time - 4, res - 5
-
-// opted out pods
-// ask6: pri - 10, create time - 5, res - 10
-// ask7: pri - 10, create time - 2, res - 8
-// ask8: pri - 15, create time - 6, res - 10
-
-// driver/owner pods
-// ask9: pri - 10, create time - 2, res - 5
-// ask10: pri - 5, create time - 4, res - 5
-
-// original asks order: 6, 7, 8, 9, 10, 1, 2, 3, 4, 5
-// expected sorted asks o/p: 5, 1, 4, 2, 3, 6, 7, 8, 10, 9
-func TestSortAllocations(t *testing.T) {
-       node := NewNode(&si.NodeInfo{
-               NodeID:     "node",
-               Attributes: nil,
-               SchedulableResource: &si.Resource{
-                       Resources: map[string]*si.Quantity{"first": {Value: 
100}},
-               },
-       })
-
-       requiredAsk := createAllocationAsk("ask", "app1", true, true, 20,
-               
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
-
-       p := NewRequiredNodePreemptor(node, requiredAsk)
-       asks := prepareAllocationAsks(t, node)
-       p.filterAllocations()
-       p.sortAllocations()
-       sortedAsks := p.getAllocations()
-
-       // assert regular pods
-       assert.Equal(t, sortedAsks[0].GetAllocationKey(), "ask5")
-       assert.Equal(t, sortedAsks[1].GetAllocationKey(), "ask1")
-       assert.Equal(t, sortedAsks[2].GetAllocationKey(), "ask4")
-       assert.Equal(t, sortedAsks[3].GetAllocationKey(), "ask2")
-       assert.Equal(t, sortedAsks[4].GetAllocationKey(), "ask3")
-
-       // assert opted out pods
-       assert.Equal(t, sortedAsks[5].GetAllocationKey(), "ask6")
-       assert.Equal(t, sortedAsks[6].GetAllocationKey(), "ask7")
-       assert.Equal(t, sortedAsks[7].GetAllocationKey(), "ask8")
-
-       // assert driver/owner pods
-       assert.Equal(t, sortedAsks[8].GetAllocationKey(), "ask10")
-       assert.Equal(t, sortedAsks[9].GetAllocationKey(), "ask9")
-
-       removeAllocationAsks(node, asks)
-}
-
 func TestFilterAllocations(t *testing.T) {
        node := NewNode(&si.NodeInfo{
                NodeID:     "node",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to