This is an automated email from the ASF dual-hosted git repository.
mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 718c7ea1 [YUNIKORN-3092] Reservations can permanently block nodes,
leading to preemption failure and a stuck scheduler state (#1025)
718c7ea1 is described below
commit 718c7ea199db33ec4d7275473d9105a39ebebfc5
Author: Manikandan R <[email protected]>
AuthorDate: Mon Aug 18 20:40:07 2025 +0530
[YUNIKORN-3092] Reservations can permanently block nodes, leading to
preemption failure and a stuck scheduler state (#1025)
Consider even the nodes which has non daemon set reservations and
reservation (allocation) not triggered preemption earlier as candidate to
increase the chances of preemption by cancelling those reservations only if its
priority is lesser than the preemptor priority and waiting in reservation for
more than 1 hour.
Closes: #1025
Signed-off-by: Manikandan R <[email protected]>
---
pkg/scheduler/objects/application.go | 4 +-
pkg/scheduler/objects/application_test.go | 142 ++++++++++++++++++++++++++++--
pkg/scheduler/objects/preemption.go | 30 ++++++-
3 files changed, 166 insertions(+), 10 deletions(-)
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index a4ad75e6..b2c54318 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -47,7 +47,9 @@ import (
)
var (
- reservationDelay = 2 * time.Second
+ reservationDelay = 2 * time.Second
+ reservationWaitTimeout = 60 * time.Minute
+ // Make it configurable
completingTimeout = 30 * time.Second
terminatedTimeout = 3 * 24 * time.Hour
defaultPlaceholderTimeout = 15 * time.Minute
diff --git a/pkg/scheduler/objects/application_test.go
b/pkg/scheduler/objects/application_test.go
index cf79b3a6..20715e0c 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -2248,6 +2248,26 @@ func TestTryAllocatePreemptQueue(t *testing.T) {
}
func TestTryAllocatePreemptNode(t *testing.T) {
+ iterator, getNode, _, ask3, _, app2, allocs :=
createPreemptNodeTestSetup(t)
+ preemptionAttemptsRemaining := 10
+
+ // preemption delay not yet passed, so preemption should fail
+ result3 :=
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator,
getNode)
+ assert.Assert(t, result3 == nil, "result3 expected")
+ assert.Assert(t, !allocs[1].IsPreempted(), "alloc1 should have been
preempted")
+ assertAllocationLog(t, ask3)
+
+ // pass the time and try again
+ ask3.createTime = ask3.createTime.Add(-30 * time.Second)
+ result3 =
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator,
getNode)
+ assert.Assert(t, result3 != nil, "result3 expected")
+ assert.Equal(t, Reserved, result3.ResultType, "expected reservation")
+ alloc3 := result3.Request
+ assert.Assert(t, alloc3 != nil, "alloc3 expected")
+ assert.Assert(t, allocs[0].IsPreempted(), "alloc1 should have been
preempted")
+}
+
+func createPreemptNodeTestSetup(t *testing.T) (func() NodeIterator,
func(NodeID string) *Node, *Queue, *Allocation, *Application, *Application,
[]*Allocation) {
node1 := newNode("node1", map[string]resources.Quantity{"first": 20})
node2 := newNode("node2", map[string]resources.Quantity{"first": 20})
nodeMap := map[string]*Node{"node1": node1, "node2": node2}
@@ -2330,18 +2350,124 @@ func TestTryAllocatePreemptNode(t *testing.T) {
err = node1.Reserve(app2, ask3)
assert.NilError(t, err)
- // preemption delay not yet passed, so preemption should fail
- result3 =
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator,
getNode)
- assert.Assert(t, result3 == nil, "result3 expected")
- assert.Assert(t, !allocs[1].IsPreempted(), "alloc1 should have been
preempted")
- assertAllocationLog(t, ask3)
+ return iterator, getNode, childQ2, ask3, app1, app2, allocs
+}
+
+func createPreemptNodeWithReservationsTestSetup(t *testing.T) (func()
NodeIterator, func(NodeID string) *Node, *Allocation, *Allocation,
*Application, *Application, []*Allocation) {
+ iterator, getNode, childQ2, ask3, app1, _, allocs :=
createPreemptNodeTestSetup(t)
+
+ app3 := newApplication(appID3, "default", "root.parent.child2")
+ app3.SetQueue(childQ2)
+ childQ2.applications[appID3] = app3
+ ask4 := newAllocationAsk("alloc4", appID3,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
+ ask4.allowPreemptOther = true
+ ask4.priority = math.MaxInt32
+ err := app3.AddAllocationAsk(ask4)
+ assert.NilError(t, err)
+
+ return iterator, getNode, ask3, ask4, app1, app3, allocs
+}
+
+func TestTryAllocatePreemptNodeWithReservations(t *testing.T) {
+ iterator, getNode, _, ask4, _, app3, allocs :=
createPreemptNodeWithReservationsTestSetup(t)
+
+ preemptionAttemptsRemaining := 10
// pass the time and try again
- ask3.createTime = ask3.createTime.Add(-30 * time.Second)
- result3 =
app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator,
getNode)
+ ask4.createTime = ask4.createTime.Add(-30 * time.Second)
+ reservationWaitTimeout = -60 * time.Second
+ result3 :=
app3.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator,
getNode)
assert.Assert(t, result3 != nil, "result3 expected")
assert.Equal(t, Reserved, result3.ResultType, "expected reservation")
- alloc3 = result3.Request
+ alloc3 := result3.Request
+ assert.Assert(t, alloc3 != nil, "alloc3 expected")
+ assert.Assert(t, allocs[0].IsPreempted(), "alloc1 should have been
preempted")
+}
+
+func TestTryAllocatePreemptNodeWithReservationsWithHighPriority(t *testing.T) {
+ iterator, getNode, _, ask4, _, app3, allocs :=
createPreemptNodeWithReservationsTestSetup(t)
+
+ // Make ask (preemptor) priority lower than the reserved asks (victims)
so that preemption would not yield positive outcome
+ ask4.priority = -1
+
+ preemptionAttemptsRemaining := 10
+
+ // pass the time and try again
+ ask4.createTime = ask4.createTime.Add(-30 * time.Second)
+ reservationWaitTimeout = -60 * time.Second
+ result3 :=
app3.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator,
getNode)
+ assert.Assert(t, result3 == nil, "result3 expected")
+
+ // Set higher priority than the reserved ask priority
+ ask4.priority = math.MaxInt32
+ ask4.preemptCheckTime = ask4.preemptCheckTime.Add(-30 * time.Second)
+ result4 :=
app3.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator,
getNode)
+ assert.Assert(t, result4 != nil, "result3 expected")
+ assert.Equal(t, Reserved, result4.ResultType, "expected reservation")
+ alloc3 := result4.Request
+ assert.Assert(t, alloc3 != nil, "alloc3 expected")
+ assert.Assert(t, allocs[0].IsPreempted(), "alloc1 should have been
preempted")
+}
+
+// TestTryAllocatePreemptNodeWithReservationsNotPossibleToCancel Ensures
reservations cannot be cancelled because of the following constraints:
+// 1. Reservation Wait Time out not exceeded
+// 2. Reserved allocation has required node set
+// 3. Reserved allocation has already marked with "triggered preemption" flag
+// Preemption would start yielding results once above constraints has been
resolved
+func TestTryAllocatePreemptNodeWithReservationsNotPossibleToCancel(t
*testing.T) {
+ iterator, getNode, ask3, ask4, app1, app3, allocs :=
createPreemptNodeWithReservationsTestSetup(t)
+
+ // Make reservation ask (victim) as already "triggered preemption" so
that it won't be considered for cancellation.
+ ask3.MarkTriggeredPreemption()
+
+ // Make reservation ask (victim) as daemon-set so that it won't be
considered for cancellation.
+ ask5 := newAllocationAsk("alloc5", appID1,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
+ ask5.requiredNode = "node2"
+ err := app1.AddAllocationAsk(ask5)
+ assert.NilError(t, err)
+
+ preemptionAttemptsRemaining := 10
+
+ // on first attempt, should see a reservation on node2 since we're
after the reservation timeout
+ var alloc11 *Allocation
+ ask5.createTime = ask5.createTime.Add(-10 * time.Second)
+ result1 :=
app1.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator,
getNode)
+ assert.Assert(t, result1 != nil, "result1 expected")
+ alloc11 = result1.Request
+ assert.Equal(t, "alloc5", alloc11.allocationKey, "wrong node
assignment")
+ assert.Assert(t, result1.Request != nil, "alloc1 expected")
+ assert.Equal(t, "node2", result1.NodeID, "wrong node assignment")
+ assert.Equal(t, Reserved, result1.ResultType, "expected reservation")
+ allocs = append(allocs, alloc11)
+ err = getNode("node2").Reserve(app1, ask5)
+ assert.NilError(t, err)
+
+ // Set higher priority than the reserved ask priority but no preemption
because reserved ask waiting time not exceeded
+ ask4.priority = 1
+ result3 :=
app3.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator,
getNode)
+ assert.Assert(t, result3 == nil, "result3 expected")
+
+ // Ensure reserved ask waiting time exceeds
+ // Both Node 1 & 2 has reservations, one allocation has required node
set and another had marked for "triggered preemption" flag
+ // Still, preemption doesn't yield any positive outcome
+ ask4.createTime = ask4.createTime.Add(-30 * time.Second)
+ reservationWaitTimeout = -60 * time.Second
+ ask4.preemptCheckTime = ask4.preemptCheckTime.Add(-30 * time.Second)
+ result4 :=
app3.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator,
getNode)
+ assert.Assert(t, result4 == nil, "result3 expected")
+
+ // Ensure reserved ask waiting time exceeds
+ // Ensure reserved allocation doesn't have required node set and not
marked for "triggered preemption" flag
+ // Still, preemption doesn't yield any positive outcome
+ ask5.requiredNode = ""
+ ask3.preemptionTriggered = false
+ ask4.createTime = ask4.createTime.Add(-30 * time.Second)
+ reservationWaitTimeout = -60 * time.Second
+ ask4.preemptCheckTime = ask4.preemptCheckTime.Add(-30 * time.Second)
+ result5 :=
app3.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first":
18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator,
getNode)
+ assert.Assert(t, result5 != nil, "result3 expected")
+ assert.Equal(t, Reserved, result5.ResultType, "expected reservation")
+ alloc3 := result5.Request
assert.Assert(t, alloc3 != nil, "alloc3 expected")
assert.Assert(t, allocs[0].IsPreempted(), "alloc1 should have been
preempted")
}
diff --git a/pkg/scheduler/objects/preemption.go
b/pkg/scheduler/objects/preemption.go
index c9bf760d..8751000f 100644
--- a/pkg/scheduler/objects/preemption.go
+++ b/pkg/scheduler/objects/preemption.go
@@ -161,7 +161,35 @@ func (p *Preemptor) initWorkingState() {
// walk node iterator and track available resources per node
p.iterator.ForEachNode(func(node *Node) bool {
- if !node.IsSchedulable() || (node.IsReserved() &&
!node.isReservedForAllocation(p.ask.GetAllocationKey())) ||
!node.FitInNode(p.ask.GetAllocatedResource()) {
+ hasOtherReservations := false
+ if node.IsReserved() &&
!node.isReservedForAllocation(p.ask.GetAllocationKey()) {
+ hasOtherReservations = true
+ for _, res := range node.GetReservations() {
+ // Is Allocation daemon set?
+ // Has this allocation already triggered
preemption?
+ if res.alloc.requiredNode != "" ||
res.alloc.HasTriggeredPreemption() {
+ continue
+ }
+ createTime := res.alloc.GetCreateTime()
+ // Take reservation delay also into account
+ askAge :=
time.Since(createTime.Add(reservationWaitTimeout).Add(reservationDelay))
+
+ // Cancel reservation based on its priority and
waiting time in reservation queue
+ if res.alloc.GetPriority() < p.ask.priority &&
askAge > reservationWaitTimeout {
+ num := res.app.UnReserve(res.node,
res.alloc)
+
res.app.GetQueue().UnReserve(res.app.ApplicationID, num)
+
log.Log(log.SchedApplication).Info("Cancelled reservation to consider node for
preemption",
+ zap.String("triggered by
appID", p.application.ApplicationID),
+ zap.String("triggered by
allocationKey", p.ask.allocationKey),
+ zap.String("affected
application ID", res.appID),
+ zap.String("affected
allocationKey", res.allocKey),
+ zap.String("node", res.nodeID),
+ zap.Int("reservations count",
num))
+ hasOtherReservations = false
+ }
+ }
+ }
+ if !node.IsSchedulable() || hasOtherReservations ||
!node.FitInNode(p.ask.GetAllocatedResource()) {
// node is not available, remove any potential victims
from consideration
delete(allocationsByNode, node.NodeID)
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]