This is an automated email from the ASF dual-hosted git repository.
wilfreds pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 9cdcd08d [YUNIKORN-3148] Propagate headroom in getOutstandingRequests
(#1049)
9cdcd08d is described below
commit 9cdcd08df8d0d6a5482ff2cb4217d19d9cdf5008
Author: Peter Bacsko <[email protected]>
AuthorDate: Wed Nov 26 10:51:32 2025 +1100
[YUNIKORN-3148] Propagate headroom in getOutstandingRequests (#1049)
Headroom depletion stops collecting outstanding requests. The headroom
usage did not propogate in the calculations causing each queue to mark
allocations for the full headroom.
This caused the collection of too many cwallocationsasks, which results in
more nodes provisioned while auto scaling. Nodes get spun down again later
as the scheduler does not use them.
Closes: #1049
Signed-off-by: Wilfred Spiegelenburg <[email protected]>
---
pkg/scheduler/objects/application.go | 7 +-
pkg/scheduler/objects/application_test.go | 58 ++++++++-----
pkg/scheduler/objects/queue.go | 31 +++++--
pkg/scheduler/objects/queue_test.go | 133 +++++++++++++++++++++++++-----
pkg/scheduler/partition.go | 4 +-
5 files changed, 180 insertions(+), 53 deletions(-)
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index cfc096e5..eadc8b56 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -945,11 +945,12 @@ func (sa *Application) canAllocationReserve(alloc
*Allocation) error {
return nil
}
-func (sa *Application) getOutstandingRequests(headRoom *resources.Resource,
userHeadRoom *resources.Resource, total *[]*Allocation) {
+func (sa *Application) getOutstandingRequests(headRoom *resources.Resource,
userHeadRoom *resources.Resource, total *[]*Allocation) *resources.Resource {
sa.RLock()
defer sa.RUnlock()
+ resTotal := resources.NewResource()
if sa.sortedRequests == nil {
- return
+ return resTotal
}
for _, request := range sa.sortedRequests {
if request.IsAllocated() || !request.IsSchedulingAttempted() {
@@ -961,11 +962,13 @@ func (sa *Application) getOutstandingRequests(headRoom
*resources.Resource, user
if !request.HasTriggeredScaleUp() &&
request.requiredNode == common.Empty && !sa.canReplace(request) {
// if headroom is still enough for the resources
*total = append(*total, request)
+ resTotal.AddTo(request.GetAllocatedResource())
}
headRoom = resources.SubOnlyExisting(headRoom,
request.GetAllocatedResource())
userHeadRoom = resources.SubOnlyExisting(userHeadRoom,
request.GetAllocatedResource())
}
}
+ return resTotal
}
// canReplace returns true if there is a placeholder for the task group
available for the request.
diff --git a/pkg/scheduler/objects/application_test.go
b/pkg/scheduler/objects/application_test.go
index e00c67e9..307bcf49 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -2981,47 +2981,58 @@ func TestGetOutstandingRequests(t *testing.T) {
Groups: []string{"group1"},
}
+ // Test Case 1: check with no requests
+ queueHeadroom, err :=
resources.NewResourceFromConf(map[string]string{"memory": "250", "vcores":
"25"})
+ assert.NilError(t, err, "failed to create queue headroom resource with
error")
+ userHeadroom, err :=
resources.NewResourceFromConf(map[string]string{"memory": "50", "vcores": "5"})
+ assert.NilError(t, err, "failed to create user headroom resource with
error")
+ var total0 []*Allocation
+ resTotal := app.getOutstandingRequests(queueHeadroom, userHeadroom,
&total0)
+ assert.Assert(t, resources.Equals(resTotal, resources.Zero),
"unexpected amount of collected resources %v", resTotal)
+
// Set up the Application's sortedRequests with AllocationAsks
sr := sortedRequests{}
sr.insert(allocationAsk1)
sr.insert(allocationAsk2)
app.sortedRequests = sr
- // Test Case 1: queueHeadroom meets, but userHeadroom does not
- queueHeadroom, err :=
resources.NewResourceFromConf(map[string]string{"memory": "250", "vcores":
"25"})
- assert.NilError(t, err, "failed to create queue headroom resource with
error")
- userHeadroom, err :=
resources.NewResourceFromConf(map[string]string{"memory": "50", "vcores": "5"})
- assert.NilError(t, err, "failed to create user headroom resource with
error")
- total1 := []*Allocation{}
- app.getOutstandingRequests(queueHeadroom, userHeadroom, &total1)
+ // Test Case 2: queueHeadroom meets, but userHeadroom does not
+ var total1 []*Allocation
+ resTotal = app.getOutstandingRequests(queueHeadroom, userHeadroom,
&total1)
assert.Equal(t, 0, len(total1), "expected one outstanding request for
TestCase 1")
+ assert.Assert(t, resources.Equals(resTotal, resources.Zero),
"unexpected amount of collected resources %v", resTotal)
- // Test Case 2: Both queueHeadroom and userHeadroom meet
+ // Test Case 3: Both queueHeadroom and userHeadroom meet
queueHeadroom2, err :=
resources.NewResourceFromConf(map[string]string{"memory": "250", "vcores":
"25"})
assert.NilError(t, err, "failed to create queue headroom resource with
error")
userHeadroom2, err :=
resources.NewResourceFromConf(map[string]string{"memory": "250", "vcores":
"25"})
assert.NilError(t, err, "failed to create user headroom resource with
error")
- total2 := []*Allocation{}
- app.getOutstandingRequests(queueHeadroom2, userHeadroom2, &total2)
+ var total2 []*Allocation
+ resTotal = app.getOutstandingRequests(queueHeadroom2, userHeadroom2,
&total2)
assert.Equal(t, 2, len(total2), "expected two outstanding requests for
TestCase 2")
+ expectedResTotal, err :=
resources.NewResourceFromConf(map[string]string{"memory": "200", "vcores":
"20"})
+ assert.NilError(t, err)
+ assert.Assert(t, resources.Equals(resTotal, expectedResTotal), "total
collected resources is %v, expected %v", resTotal, expectedResTotal)
- // Test Case 3: queueHeadroom does not meet, but userHeadroom meets
+ // Test Case 4: queueHeadroom does not meet, but userHeadroom meets
queueHeadroom3, err :=
resources.NewResourceFromConf(map[string]string{"memory": "50", "vcores": "5"})
assert.NilError(t, err, "failed to create queue headroom resource with
error")
userHeadroom3, err :=
resources.NewResourceFromConf(map[string]string{"memory": "250", "vcores":
"25"})
assert.NilError(t, err, "failed to create user headroom resource with
error")
- total3 := []*Allocation{}
- app.getOutstandingRequests(queueHeadroom3, userHeadroom3, &total3)
+ var total3 []*Allocation
+ resTotal = app.getOutstandingRequests(queueHeadroom3, userHeadroom3,
&total3)
assert.Equal(t, 0, len(total3), "expected one outstanding request for
TestCase 3")
+ assert.Assert(t, resources.Equals(resTotal, resources.Zero),
"unexpected amount of collected resources %v", resTotal)
- // Test Case 4: Neither queueHeadroom nor userHeadroom meets
+ // Test Case 5: Neither queueHeadroom nor userHeadroom meets
queueHeadroom4, err :=
resources.NewResourceFromConf(map[string]string{"memory": "50", "vcores": "5"})
assert.NilError(t, err, "failed to create queue headroom resource with
error")
userHeadroom4, err :=
resources.NewResourceFromConf(map[string]string{"memory": "80", "vcores": "8"})
assert.NilError(t, err, "failed to create user headroom resource with
error")
- total4 := []*Allocation{}
- app.getOutstandingRequests(queueHeadroom4, userHeadroom4, &total4)
+ var total4 []*Allocation
+ resTotal = app.getOutstandingRequests(queueHeadroom4, userHeadroom4,
&total4)
assert.Equal(t, 0, len(total4), "expected no outstanding requests for
TestCase 4")
+ assert.Assert(t, resources.Equals(resTotal, resources.Zero),
"unexpected amount of collected resources %v", resTotal)
}
func TestGetOutstandingRequests_NoSchedulingAttempt(t *testing.T) {
@@ -3047,8 +3058,9 @@ func TestGetOutstandingRequests_NoSchedulingAttempt(t
*testing.T) {
var total []*Allocation
headroom :=
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10})
userHeadroom :=
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 8})
- app.getOutstandingRequests(headroom, userHeadroom, &total)
-
+ resTotal := app.getOutstandingRequests(headroom, userHeadroom, &total)
+ expectedTotal :=
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 2})
+ assert.Assert(t, resources.Equals(resTotal, expectedTotal), "expected
resource %v, but got %v", expectedTotal, resTotal)
assert.Equal(t, 2, len(total))
assert.Equal(t, "alloc-2", total[0].allocationKey)
assert.Equal(t, "alloc-4", total[1].allocationKey)
@@ -3086,8 +3098,9 @@ func
TestGetOutstandingRequests_RequestTriggeredPreemptionHasRequiredNode(t *tes
var total []*Allocation
headroom :=
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10})
userHeadroom :=
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 8})
- app.getOutstandingRequests(headroom, userHeadroom, &total)
-
+ resTotal := app.getOutstandingRequests(headroom, userHeadroom, &total)
+ expectedTotal :=
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1})
+ assert.Assert(t, resources.Equals(resTotal, expectedTotal), "expected
resource %v, but got %v", expectedTotal, resTotal)
assert.Equal(t, 1, len(total))
assert.Equal(t, "alloc-4", total[0].allocationKey)
}
@@ -3119,8 +3132,9 @@ func TestGetOutstandingRequests_AskReplaceable(t
*testing.T) {
var total []*Allocation
headroom :=
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10})
userHeadroom :=
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 8})
- app.getOutstandingRequests(headroom, userHeadroom, &total)
-
+ resTotal := app.getOutstandingRequests(headroom, userHeadroom, &total)
+ expectedTotal :=
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1})
+ assert.Assert(t, resources.Equals(resTotal, expectedTotal), "expected
resource %v, but got %v", expectedTotal, resTotal)
assert.Equal(t, 1, len(total))
assert.Equal(t, "alloc-3", total[0].allocationKey)
}
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index a56fd460..6aeded21 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -1634,22 +1634,43 @@ func (sq *Queue) TryPlaceholderAllocate(iterator func()
NodeIterator, getnode fu
return nil
}
-// GetQueueOutstandingRequests builds a slice of pending allocation asks that
fits into the queue's headroom.
-func (sq *Queue) GetQueueOutstandingRequests(total *[]*Allocation) {
+// GetOutstandingRequests builds a slice of pending allocation asks that fits
into the queue's headroom.
+// This method can only be called for the root of the queue hierarchy.
Otherwise it returns nil.
+func (sq *Queue) GetOutstandingRequests() []*Allocation {
+ total := make([]*Allocation, 0)
+ if sq.parent != nil {
+ return nil
+ }
+
+ for _, child := range sq.sortQueues() {
+ _ =
child.getOutStandingRequestsInternal(resources.NewResource(), &total)
+ }
+ return total
+}
+
+func (sq *Queue) getOutStandingRequestsInternal(parentHeadroom
*resources.Resource, total *[]*Allocation) *resources.Resource {
+ headRoom := sq.internalHeadRoom(parentHeadroom)
+ outstandingTotal := resources.NewResource() // accumulated resource
usage of all collected asks on this level
+
if sq.IsLeafQueue() {
- headRoom := sq.getMaxHeadRoom()
// while calculating outstanding requests, we calculate all the
requests that can fit into the queue's headroom,
// all these requests are qualified to trigger the up scaling.
for _, app := range sq.sortApplications(false) {
// calculate the users' headroom
userHeadroom :=
ugm.GetUserManager().Headroom(app.queuePath, app.ApplicationID, app.user)
- app.getOutstandingRequests(headRoom, userHeadroom,
total)
+ appTotal := app.getOutstandingRequests(headRoom,
userHeadroom, total)
+ outstandingTotal.AddTo(appTotal)
+ headRoom = resources.SubOnlyExisting(headRoom, appTotal)
}
} else {
for _, child := range sq.sortQueues() {
- child.GetQueueOutstandingRequests(total)
+ queueTotal :=
child.getOutStandingRequestsInternal(headRoom, total)
+ outstandingTotal.AddTo(queueTotal)
+ headRoom = resources.SubOnlyExisting(headRoom,
queueTotal)
}
}
+
+ return outstandingTotal
}
// TryReservedAllocate tries to allocate a reservation.
diff --git a/pkg/scheduler/objects/queue_test.go
b/pkg/scheduler/objects/queue_test.go
index 9c1575d9..8b291bc0 100644
--- a/pkg/scheduler/objects/queue_test.go
+++ b/pkg/scheduler/objects/queue_test.go
@@ -1268,16 +1268,15 @@ func testOutstanding(t *testing.T, allocMap, usedMap
map[string]string) {
}
// verify get outstanding requests for root, and child queues
- rootTotal := make([]*Allocation, 0)
- root.GetQueueOutstandingRequests(&rootTotal)
+ rootTotal := root.GetOutstandingRequests()
assert.Equal(t, len(rootTotal), 15)
queue1Total := make([]*Allocation, 0)
- queue1.GetQueueOutstandingRequests(&queue1Total)
+ queue1.getOutStandingRequestsInternal(resources.NewResource(),
&queue1Total)
assert.Equal(t, len(queue1Total), 10)
queue2Total := make([]*Allocation, 0)
- queue2.GetQueueOutstandingRequests(&queue2Total)
+ queue2.getOutStandingRequestsInternal(resources.NewResource(),
&queue2Total)
assert.Equal(t, len(queue2Total), 5)
// simulate queue1 has some allocated resources
@@ -1285,26 +1284,27 @@ func testOutstanding(t *testing.T, allocMap, usedMap
map[string]string) {
assert.NilError(t, err, "failed to increment allocated resources")
queue1Total = make([]*Allocation, 0)
- queue1.GetQueueOutstandingRequests(&queue1Total)
+ queue1.getOutStandingRequestsInternal(resources.NewResource(),
&queue1Total)
assert.Equal(t, len(queue1Total), 5)
queue2Total = make([]*Allocation, 0)
- queue2.GetQueueOutstandingRequests(&queue2Total)
+ queue2.getOutStandingRequestsInternal(resources.NewResource(),
&queue2Total)
assert.Equal(t, len(queue2Total), 5)
- rootTotal = make([]*Allocation, 0)
- root.GetQueueOutstandingRequests(&rootTotal)
+ rootTotal = root.GetOutstandingRequests()
assert.Equal(t, len(rootTotal), 10)
// remove app2 from queue2
queue2.RemoveApplication(app2)
queue2Total = make([]*Allocation, 0)
- queue2.GetQueueOutstandingRequests(&queue2Total)
+ queue2.getOutStandingRequestsInternal(resources.NewResource(),
&queue2Total)
assert.Equal(t, len(queue2Total), 0)
- rootTotal = make([]*Allocation, 0)
- root.GetQueueOutstandingRequests(&rootTotal)
+ rootTotal = root.GetOutstandingRequests()
assert.Equal(t, len(rootTotal), 5)
+
+ // test for non-root queue
+ assert.Assert(t, queue1.GetOutstandingRequests() == nil)
}
func TestGetOutstandingOnlyUntracked(t *testing.T) {
@@ -1341,12 +1341,11 @@ func TestGetOutstandingOnlyUntracked(t *testing.T) {
}
// verify get outstanding requests for root, and child queues
- rootTotal := make([]*Allocation, 0)
- root.GetQueueOutstandingRequests(&rootTotal)
+ rootTotal := root.GetOutstandingRequests()
assert.Equal(t, len(rootTotal), 20)
queue1Total := make([]*Allocation, 0)
- queue1.GetQueueOutstandingRequests(&queue1Total)
+ queue1.getOutStandingRequestsInternal(resources.NewResource(),
&queue1Total)
assert.Equal(t, len(queue1Total), 20)
// simulate queue1 has some allocated resources
@@ -1355,13 +1354,12 @@ func TestGetOutstandingOnlyUntracked(t *testing.T) {
assert.NilError(t, err, "failed to increment allocated resources")
queue1Total = make([]*Allocation, 0)
- queue1.GetQueueOutstandingRequests(&queue1Total)
+ queue1.getOutStandingRequestsInternal(resources.NewResource(),
&queue1Total)
assert.Equal(t, len(queue1Total), 20)
headRoom := queue1.getHeadRoom()
assert.Assert(t, resources.IsZero(headRoom), "headroom should have been
zero")
- rootTotal = make([]*Allocation, 0)
- root.GetQueueOutstandingRequests(&rootTotal)
+ rootTotal = root.GetOutstandingRequests()
assert.Equal(t, len(rootTotal), 20)
headRoom = root.getHeadRoom()
assert.Assert(t, resources.IsZero(headRoom), "headroom should have been
zero")
@@ -1399,19 +1397,112 @@ func TestGetOutstandingRequestNoMax(t *testing.T) {
assert.NilError(t, err, "failed to add allocation ask")
}
- rootTotal := make([]*Allocation, 0)
- root.GetQueueOutstandingRequests(&rootTotal)
+ rootTotal := root.GetOutstandingRequests()
assert.Equal(t, len(rootTotal), 30)
queue1Total := make([]*Allocation, 0)
- queue1.GetQueueOutstandingRequests(&queue1Total)
+ queue1.getOutStandingRequestsInternal(resources.NewResource(),
&queue1Total)
assert.Equal(t, len(queue1Total), 10)
queue2Total := make([]*Allocation, 0)
- queue2.GetQueueOutstandingRequests(&queue2Total)
+ queue2.getOutStandingRequestsInternal(resources.NewResource(),
&queue2Total)
assert.Equal(t, len(queue2Total), 20)
}
+func TestOutstandingMultipleApps(t *testing.T) {
+ root, err := createRootQueue(map[string]string{"memory": "10"})
+ assert.NilError(t, err, "failed to create root queue with limit")
+ leaf, err := createManagedQueue(root, "leaf", false,
map[string]string{"memory": "10"})
+ assert.NilError(t, err, "failed to create leaf")
+
+ used, err := resources.NewResourceFromConf(map[string]string{"memory":
"8"})
+ assert.NilError(t, err, "failed to create basic resource")
+
+ err = leaf.TryIncAllocatedResource(used)
+ assert.NilError(t, err, "failed to increment allocated resources")
+ headRoom := leaf.getMaxHeadRoom()
+ assert.Assert(t, headRoom != nil, "headroom should not be nil")
+ expectedHeadroom, err :=
resources.NewResourceFromConf(map[string]string{"memory": "2"})
+ assert.NilError(t, err, "failed to create basic resource")
+ assert.Assert(t, resources.Equals(headRoom, expectedHeadroom),
"headRoom is %s instead of %s", headRoom, expectedHeadroom)
+
+ appRes, err :=
resources.NewResourceFromConf(map[string]string{"memory": "1"})
+ assert.NilError(t, err, "failed to create request resource")
+ for i := 0; i < 10; i++ {
+ appID := "app-" + strconv.Itoa(i)
+ app := newApplication(appID, "default", "root.leaf")
+ app.queue = leaf
+ leaf.AddApplication(app)
+
+ ask := newAllocationAsk(fmt.Sprintf("alloc-%d", i), appID,
appRes)
+ ask.SetSchedulingAttempted(true)
+ err = app.AddAllocationAsk(ask)
+ assert.NilError(t, err, "failed to add allocation ask")
+ }
+
+ rootTotal := root.GetOutstandingRequests()
+ assert.Equal(t, len(rootTotal), 2)
+
+ leafTotal := make([]*Allocation, 0)
+ leaf.getOutStandingRequestsInternal(nil, &leafTotal)
+ assert.Equal(t, len(rootTotal), 2)
+}
+
+// checks proper headroom calculation with multiple leafs
+// with 3 pending asks, only 2 of them are expected to be collected
+func TestOutStandingRequestMultipleChildrenWithMax(t *testing.T) {
+ root, err := createRootQueue(nil)
+ assert.NilError(t, err, "failed to create root queue")
+
+ parent, err := createManagedQueue(root, "parent", true,
map[string]string{
+ "memory": "6",
+ })
+ assert.NilError(t, err, "failed to create parent queue")
+
+ leaf1, err := createManagedQueue(parent, "leaf1", false,
map[string]string{
+ "memory": "5",
+ })
+ assert.NilError(t, err, "failed to create leaf1 queue")
+ leaf2, err := createManagedQueue(parent, "leaf2", false,
map[string]string{
+ "memory": "5",
+ })
+ assert.NilError(t, err, "failed to create leaf2 queue")
+
+ allocatedRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{
+ "memory": 2,
+ })
+ askRes := resources.NewResourceFromMap(map[string]resources.Quantity{
+ "memory": 1,
+ })
+ leaf1App := newApplication("app-leaf1", "default", "root.parent.leaf1")
+ leaf1App.SetQueue(leaf1)
+ leaf1.AddApplication(leaf1App)
+ leaf1.IncAllocatedResource(allocatedRes)
+ // use priority = 1000 for this ask to force ordering of queues when
sorting
+ askLeaf1 := newAllocationAskAll("ask-leaf1", "app-leaf1", "", askRes,
false, 1000)
+ askLeaf1.SetSchedulingAttempted(true)
+ err = leaf1App.AddAllocationAsk(askLeaf1)
+ assert.NilError(t, err, "could not add ask")
+
+ leaf2App := newApplication("app-leaf2", "default", "root.parent.leaf2")
+ leaf2App.SetQueue(leaf2)
+ ask1Leaf2 := newAllocationAsk("ask1-leaf2", "app-leaf2", askRes)
+ ask1Leaf2.SetSchedulingAttempted(true)
+ err = leaf2App.AddAllocationAsk(ask1Leaf2)
+ assert.NilError(t, err, "could not add ask")
+ ask2Leaf2 := newAllocationAsk("ask1-leaf2", "app-leaf2", askRes)
+ ask2Leaf2.SetSchedulingAttempted(true)
+ err = leaf2App.AddAllocationAsk(ask2Leaf2)
+ assert.NilError(t, err, "could not add ask")
+ leaf2.AddApplication(leaf2App)
+ leaf2.IncAllocatedResource(allocatedRes)
+
+ outstanding := root.GetOutstandingRequests()
+ assert.Equal(t, 2, len(outstanding), "expected 2 outstanding requests
to be collected")
+ assert.Equal(t, "ask-leaf1", outstanding[0].allocationKey)
+ assert.Equal(t, "ask1-leaf2", outstanding[1].allocationKey)
+}
+
func TestAllocationCalcRoot(t *testing.T) {
resMap := map[string]string{"memory": "100", "vcores": "10"}
// create the root: must set a max on the queue
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index 96c58943..870f46cb 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -805,9 +805,7 @@ func (pc *PartitionContext) calculateOutstandingRequests()
[]*objects.Allocation
if !resources.StrictlyGreaterThanZero(pc.root.GetPendingResource()) {
return nil
}
- outstanding := make([]*objects.Allocation, 0)
- pc.root.GetQueueOutstandingRequests(&outstanding)
- return outstanding
+ return pc.root.GetOutstandingRequests()
}
// Try regular allocation for the partition
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]