This is an automated email from the ASF dual-hosted git repository.

mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 718c7ea1 [YUNIKORN-3092] Reservations can permanently block nodes, 
leading to preemption failure and a stuck scheduler state (#1025)
718c7ea1 is described below

commit 718c7ea199db33ec4d7275473d9105a39ebebfc5
Author: Manikandan R <[email protected]>
AuthorDate: Mon Aug 18 20:40:07 2025 +0530

    [YUNIKORN-3092] Reservations can permanently block nodes, leading to 
preemption failure and a stuck scheduler state (#1025)
    
    Consider even the nodes which has non daemon set reservations and 
reservation (allocation) not triggered preemption earlier as candidate to 
increase the chances of preemption by cancelling those reservations only if its 
priority is lesser than the preemptor priority and waiting in reservation for 
more than 1 hour.
    
    Closes: #1025
    
    Signed-off-by: Manikandan R <[email protected]>
---
 pkg/scheduler/objects/application.go      |   4 +-
 pkg/scheduler/objects/application_test.go | 142 ++++++++++++++++++++++++++++--
 pkg/scheduler/objects/preemption.go       |  30 ++++++-
 3 files changed, 166 insertions(+), 10 deletions(-)

diff --git a/pkg/scheduler/objects/application.go 
b/pkg/scheduler/objects/application.go
index a4ad75e6..b2c54318 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -47,7 +47,9 @@ import (
 )
 
 var (
-       reservationDelay          = 2 * time.Second
+       reservationDelay       = 2 * time.Second
+       reservationWaitTimeout = 60 * time.Minute
+       // Make it configurable
        completingTimeout         = 30 * time.Second
        terminatedTimeout         = 3 * 24 * time.Hour
        defaultPlaceholderTimeout = 15 * time.Minute
diff --git a/pkg/scheduler/objects/application_test.go 
b/pkg/scheduler/objects/application_test.go
index cf79b3a6..20715e0c 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -2248,6 +2248,26 @@ func TestTryAllocatePreemptQueue(t *testing.T) {
 }
 
 func TestTryAllocatePreemptNode(t *testing.T) {
+       iterator, getNode, _, ask3, _, app2, allocs := 
createPreemptNodeTestSetup(t)
+       preemptionAttemptsRemaining := 10
+
+       // preemption delay not yet passed, so preemption should fail
+       result3 := 
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, 
getNode)
+       assert.Assert(t, result3 == nil, "result3 expected")
+       assert.Assert(t, !allocs[1].IsPreempted(), "alloc1 should have been 
preempted")
+       assertAllocationLog(t, ask3)
+
+       // pass the time and try again
+       ask3.createTime = ask3.createTime.Add(-30 * time.Second)
+       result3 = 
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, 
getNode)
+       assert.Assert(t, result3 != nil, "result3 expected")
+       assert.Equal(t, Reserved, result3.ResultType, "expected reservation")
+       alloc3 := result3.Request
+       assert.Assert(t, alloc3 != nil, "alloc3 expected")
+       assert.Assert(t, allocs[0].IsPreempted(), "alloc1 should have been 
preempted")
+}
+
+func createPreemptNodeTestSetup(t *testing.T) (func() NodeIterator, 
func(NodeID string) *Node, *Queue, *Allocation, *Application, *Application, 
[]*Allocation) {
        node1 := newNode("node1", map[string]resources.Quantity{"first": 20})
        node2 := newNode("node2", map[string]resources.Quantity{"first": 20})
        nodeMap := map[string]*Node{"node1": node1, "node2": node2}
@@ -2330,18 +2350,124 @@ func TestTryAllocatePreemptNode(t *testing.T) {
        err = node1.Reserve(app2, ask3)
        assert.NilError(t, err)
 
-       // preemption delay not yet passed, so preemption should fail
-       result3 = 
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, 
getNode)
-       assert.Assert(t, result3 == nil, "result3 expected")
-       assert.Assert(t, !allocs[1].IsPreempted(), "alloc1 should have been 
preempted")
-       assertAllocationLog(t, ask3)
+       return iterator, getNode, childQ2, ask3, app1, app2, allocs
+}
+
+func createPreemptNodeWithReservationsTestSetup(t *testing.T) (func() 
NodeIterator, func(NodeID string) *Node, *Allocation, *Allocation, 
*Application, *Application, []*Allocation) {
+       iterator, getNode, childQ2, ask3, app1, _, allocs := 
createPreemptNodeTestSetup(t)
+
+       app3 := newApplication(appID3, "default", "root.parent.child2")
+       app3.SetQueue(childQ2)
+       childQ2.applications[appID3] = app3
+       ask4 := newAllocationAsk("alloc4", appID3, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
+       ask4.allowPreemptOther = true
+       ask4.priority = math.MaxInt32
+       err := app3.AddAllocationAsk(ask4)
+       assert.NilError(t, err)
+
+       return iterator, getNode, ask3, ask4, app1, app3, allocs
+}
+
+func TestTryAllocatePreemptNodeWithReservations(t *testing.T) {
+       iterator, getNode, _, ask4, _, app3, allocs := 
createPreemptNodeWithReservationsTestSetup(t)
+
+       preemptionAttemptsRemaining := 10
 
        // pass the time and try again
-       ask3.createTime = ask3.createTime.Add(-30 * time.Second)
-       result3 = 
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, 
getNode)
+       ask4.createTime = ask4.createTime.Add(-30 * time.Second)
+       reservationWaitTimeout = -60 * time.Second
+       result3 := 
app3.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, 
getNode)
        assert.Assert(t, result3 != nil, "result3 expected")
        assert.Equal(t, Reserved, result3.ResultType, "expected reservation")
-       alloc3 = result3.Request
+       alloc3 := result3.Request
+       assert.Assert(t, alloc3 != nil, "alloc3 expected")
+       assert.Assert(t, allocs[0].IsPreempted(), "alloc1 should have been 
preempted")
+}
+
+func TestTryAllocatePreemptNodeWithReservationsWithHighPriority(t *testing.T) {
+       iterator, getNode, _, ask4, _, app3, allocs := 
createPreemptNodeWithReservationsTestSetup(t)
+
+       // Make ask (preemptor) priority lower than the reserved asks (victims) 
so that preemption would not yield positive outcome
+       ask4.priority = -1
+
+       preemptionAttemptsRemaining := 10
+
+       // pass the time and try again
+       ask4.createTime = ask4.createTime.Add(-30 * time.Second)
+       reservationWaitTimeout = -60 * time.Second
+       result3 := 
app3.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, 
getNode)
+       assert.Assert(t, result3 == nil, "result3 expected")
+
+       // Set higher priority than the reserved ask priority
+       ask4.priority = math.MaxInt32
+       ask4.preemptCheckTime = ask4.preemptCheckTime.Add(-30 * time.Second)
+       result4 := 
app3.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, 
getNode)
+       assert.Assert(t, result4 != nil, "result3 expected")
+       assert.Equal(t, Reserved, result4.ResultType, "expected reservation")
+       alloc3 := result4.Request
+       assert.Assert(t, alloc3 != nil, "alloc3 expected")
+       assert.Assert(t, allocs[0].IsPreempted(), "alloc1 should have been 
preempted")
+}
+
+// TestTryAllocatePreemptNodeWithReservationsNotPossibleToCancel Ensures 
reservations cannot be cancelled because of the following constraints:
+// 1. Reservation Wait Time out not exceeded
+// 2. Reserved allocation has required node set
+// 3. Reserved allocation has already marked with "triggered preemption" flag
+// Preemption would start yielding results once above constraints has been 
resolved
+func TestTryAllocatePreemptNodeWithReservationsNotPossibleToCancel(t 
*testing.T) {
+       iterator, getNode, ask3, ask4, app1, app3, allocs := 
createPreemptNodeWithReservationsTestSetup(t)
+
+       // Make reservation ask (victim) as already "triggered preemption" so 
that it won't be considered for cancellation.
+       ask3.MarkTriggeredPreemption()
+
+       // Make reservation ask (victim) as daemon-set so that it won't be 
considered for cancellation.
+       ask5 := newAllocationAsk("alloc5", appID1, 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
+       ask5.requiredNode = "node2"
+       err := app1.AddAllocationAsk(ask5)
+       assert.NilError(t, err)
+
+       preemptionAttemptsRemaining := 10
+
+       // on first attempt, should see a reservation on node2 since we're 
after the reservation timeout
+       var alloc11 *Allocation
+       ask5.createTime = ask5.createTime.Add(-10 * time.Second)
+       result1 := 
app1.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, 
getNode)
+       assert.Assert(t, result1 != nil, "result1 expected")
+       alloc11 = result1.Request
+       assert.Equal(t, "alloc5", alloc11.allocationKey, "wrong node 
assignment")
+       assert.Assert(t, result1.Request != nil, "alloc1 expected")
+       assert.Equal(t, "node2", result1.NodeID, "wrong node assignment")
+       assert.Equal(t, Reserved, result1.ResultType, "expected reservation")
+       allocs = append(allocs, alloc11)
+       err = getNode("node2").Reserve(app1, ask5)
+       assert.NilError(t, err)
+
+       // Set higher priority than the reserved ask priority but no preemption 
because reserved ask waiting time not exceeded
+       ask4.priority = 1
+       result3 := 
app3.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, 
getNode)
+       assert.Assert(t, result3 == nil, "result3 expected")
+
+       // Ensure reserved ask waiting time exceeds
+       // Both Node 1 & 2 has reservations, one allocation has required node 
set and another had marked for "triggered preemption" flag
+       // Still, preemption doesn't yield any positive outcome
+       ask4.createTime = ask4.createTime.Add(-30 * time.Second)
+       reservationWaitTimeout = -60 * time.Second
+       ask4.preemptCheckTime = ask4.preemptCheckTime.Add(-30 * time.Second)
+       result4 := 
app3.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, 
getNode)
+       assert.Assert(t, result4 == nil, "result3 expected")
+
+       // Ensure reserved ask waiting time exceeds
+       // Ensure reserved allocation doesn't have required node set and not 
marked for "triggered preemption" flag
+       // Still, preemption doesn't yield any positive outcome
+       ask5.requiredNode = ""
+       ask3.preemptionTriggered = false
+       ask4.createTime = ask4.createTime.Add(-30 * time.Second)
+       reservationWaitTimeout = -60 * time.Second
+       ask4.preemptCheckTime = ask4.preemptCheckTime.Add(-30 * time.Second)
+       result5 := 
app3.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
 18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, 
getNode)
+       assert.Assert(t, result5 != nil, "result3 expected")
+       assert.Equal(t, Reserved, result5.ResultType, "expected reservation")
+       alloc3 := result5.Request
        assert.Assert(t, alloc3 != nil, "alloc3 expected")
        assert.Assert(t, allocs[0].IsPreempted(), "alloc1 should have been 
preempted")
 }
diff --git a/pkg/scheduler/objects/preemption.go 
b/pkg/scheduler/objects/preemption.go
index c9bf760d..8751000f 100644
--- a/pkg/scheduler/objects/preemption.go
+++ b/pkg/scheduler/objects/preemption.go
@@ -161,7 +161,35 @@ func (p *Preemptor) initWorkingState() {
 
        // walk node iterator and track available resources per node
        p.iterator.ForEachNode(func(node *Node) bool {
-               if !node.IsSchedulable() || (node.IsReserved() && 
!node.isReservedForAllocation(p.ask.GetAllocationKey())) || 
!node.FitInNode(p.ask.GetAllocatedResource()) {
+               hasOtherReservations := false
+               if node.IsReserved() && 
!node.isReservedForAllocation(p.ask.GetAllocationKey()) {
+                       hasOtherReservations = true
+                       for _, res := range node.GetReservations() {
+                               // Is Allocation daemon set?
+                               // Has this allocation already triggered 
preemption?
+                               if res.alloc.requiredNode != "" || 
res.alloc.HasTriggeredPreemption() {
+                                       continue
+                               }
+                               createTime := res.alloc.GetCreateTime()
+                               // Take reservation delay also into account
+                               askAge := 
time.Since(createTime.Add(reservationWaitTimeout).Add(reservationDelay))
+
+                               // Cancel reservation based on its priority and 
waiting time in reservation queue
+                               if res.alloc.GetPriority() < p.ask.priority && 
askAge > reservationWaitTimeout {
+                                       num := res.app.UnReserve(res.node, 
res.alloc)
+                                       
res.app.GetQueue().UnReserve(res.app.ApplicationID, num)
+                                       
log.Log(log.SchedApplication).Info("Cancelled reservation to consider node for 
preemption",
+                                               zap.String("triggered by 
appID", p.application.ApplicationID),
+                                               zap.String("triggered by 
allocationKey", p.ask.allocationKey),
+                                               zap.String("affected 
application ID", res.appID),
+                                               zap.String("affected 
allocationKey", res.allocKey),
+                                               zap.String("node", res.nodeID),
+                                               zap.Int("reservations count", 
num))
+                                       hasOtherReservations = false
+                               }
+                       }
+               }
+               if !node.IsSchedulable() || hasOtherReservations || 
!node.FitInNode(p.ask.GetAllocatedResource()) {
                        // node is not available, remove any potential victims 
from consideration
                        delete(allocationsByNode, node.NodeID)
                } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to