This is an automated email from the ASF dual-hosted git repository.
ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 8a4acda8 [YUNIKORN-2802] Prune zero values from computed resource
types (#950)
8a4acda8 is described below
commit 8a4acda8ae96100117d83d9dc10a4a9dff6f9ed1
Author: qzhu <[email protected]>
AuthorDate: Wed Aug 21 13:38:23 2024 -0500
[YUNIKORN-2802] Prune zero values from computed resource types (#950)
Closes: #950
Signed-off-by: Craig Condit <[email protected]>
---
pkg/scheduler/objects/application.go | 11 ++++++++---
pkg/scheduler/objects/application_test.go | 13 ++++++++-----
pkg/scheduler/objects/node.go | 4 ++++
pkg/scheduler/objects/queue.go | 15 +++++++++++++++
pkg/scheduler/partition_test.go | 20 +++++++++++++-------
pkg/scheduler/ugm/queue_tracker.go | 1 +
6 files changed, 49 insertions(+), 15 deletions(-)
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index 1ccde74e..4b84c957 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -581,6 +581,7 @@ func (sa *Application) removeAsksInternal(allocKey string,
detail si.EventRecord
if !ask.IsAllocated() {
deltaPendingResource =
ask.GetAllocatedResource()
sa.pending = resources.Sub(sa.pending,
deltaPendingResource)
+ sa.pending.Prune()
}
delete(sa.requests, allocKey)
sa.sortedRequests.remove(ask)
@@ -648,6 +649,7 @@ func (sa *Application) AddAllocationAsk(ask *Allocation)
error {
// Update total pending resource
delta.SubFrom(oldAskResource)
sa.pending = resources.Add(sa.pending, delta)
+ sa.pending.Prune()
sa.queue.incPendingResource(delta)
log.Log(log.SchedApplication).Info("ask added successfully to
application",
@@ -726,10 +728,11 @@ func (sa *Application) allocateAsk(ask *Allocation)
(*resources.Resource, error)
sa.updateAskMaxPriority()
}
- delta := resources.Multiply(ask.GetAllocatedResource(), -1)
- sa.pending = resources.Add(sa.pending, delta)
+ delta := ask.GetAllocatedResource()
+ sa.pending = resources.Sub(sa.pending, delta)
+ sa.pending.Prune()
// update the pending of the queue with the same delta
- sa.queue.incPendingResource(delta)
+ sa.queue.decPendingResource(delta)
return delta, nil
}
@@ -1797,6 +1800,7 @@ func (sa *Application)
removeAllocationInternal(allocationKey string, releaseTyp
// as and when every ph gets removed (for replacement),
resource usage would be reduced.
// When real allocation happens as part of replacement, usage
would be increased again with real alloc resource
sa.allocatedPlaceholder =
resources.Sub(sa.allocatedPlaceholder, alloc.GetAllocatedResource())
+ sa.allocatedPlaceholder.Prune()
// if all the placeholders are replaced, clear the placeholder
timer
if resources.IsZero(sa.allocatedPlaceholder) {
@@ -1821,6 +1825,7 @@ func (sa *Application)
removeAllocationInternal(allocationKey string, releaseTyp
sa.decUserResourceUsage(alloc.GetAllocatedResource(), removeApp)
} else {
sa.allocatedResource = resources.Sub(sa.allocatedResource,
alloc.GetAllocatedResource())
+ sa.allocatedResource.Prune()
// Aggregate the resources used by this alloc to the
application's resource tracker
sa.trackCompletedResource(alloc)
diff --git a/pkg/scheduler/objects/application_test.go
b/pkg/scheduler/objects/application_test.go
index 055f89bf..4f61784e 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -432,8 +432,8 @@ func TestAllocateDeallocate(t *testing.T) {
err = app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask should have been added to app")
// allocate
- if delta, err := app.AllocateAsk(aKey); err != nil ||
!resources.Equals(resources.Multiply(res, -1), delta) {
- t.Errorf("AllocateAsk() did not return correct delta, err %v,
expected %v got %v", err, resources.Multiply(res, -1), delta)
+ if delta, err := app.AllocateAsk(aKey); err != nil ||
!resources.Equals(res, delta) {
+ t.Errorf("AllocateAsk() did not return correct delta, err %v,
expected %v got %v", err, res, delta)
}
// allocate again should fail
if delta, err := app.AllocateAsk(aKey); err == nil || delta != nil {
@@ -943,7 +943,8 @@ func TestStateChangeOnPlaceholderAdd(t *testing.T) {
released = app.RemoveAllocationAsk(askID)
assert.Equal(t, released, 0, "allocation ask should not have been
reserved")
assert.Assert(t, app.IsCompleting(), "Application should have stayed
same, changed unexpectedly: %s", app.CurrentState())
- assertUserGroupResource(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0}))
+ // zero resource should be pruned
+ assertUserGroupResource(t, getTestUserGroup(), nil)
log := app.GetStateLog()
assert.Equal(t, len(log), 2, "wrong number of app events")
@@ -1674,7 +1675,8 @@ func TestIncAndDecUserResourceUsage(t *testing.T) {
app.decUserResourceUsage(res, false)
assertUserGroupResource(t, getTestUserGroup(), res)
app.decUserResourceUsage(res, false)
- assertUserGroupResource(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0}))
+ // zero resource should be pruned
+ assertUserGroupResource(t, getTestUserGroup(), nil)
}
func TestIncAndDecUserResourceUsageInSameGroup(t *testing.T) {
@@ -1722,7 +1724,8 @@ func TestIncAndDecUserResourceUsageInSameGroup(t
*testing.T) {
// Decrease testuser and testuser2 to 0
app.decUserResourceUsage(res, false)
app2.decUserResourceUsage(res, false)
- assertUserResourcesAndGroupResources(t, getUserGroup("testuser",
testgroups),
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0}), nil, 0)
+ // zero resoure should be pruned
+ assertUserResourcesAndGroupResources(t, getUserGroup("testuser",
testgroups), nil, nil, 0)
}
func TestGetAllRequests(t *testing.T) {
diff --git a/pkg/scheduler/objects/node.go b/pkg/scheduler/objects/node.go
index db77ee68..fcce3c7f 100644
--- a/pkg/scheduler/objects/node.go
+++ b/pkg/scheduler/objects/node.go
@@ -202,6 +202,7 @@ func (sn *Node) refreshAvailableResource() {
sn.availableResource = sn.totalResource.Clone()
sn.availableResource.SubFrom(sn.allocatedResource)
sn.availableResource.SubFrom(sn.occupiedResource)
+ sn.availableResource.Prune()
// check if any quantity is negative: a nil resource is all 0's
if !resources.StrictlyGreaterThanOrEquals(sn.availableResource, nil) {
log.Log(log.SchedNode).Warn("Node update triggered over
allocated node",
@@ -311,6 +312,7 @@ func (sn *Node) RemoveAllocation(allocationKey string)
*Allocation {
if alloc != nil {
delete(sn.allocations, allocationKey)
sn.allocatedResource.SubFrom(alloc.GetAllocatedResource())
+ sn.allocatedResource.Prune()
sn.availableResource.AddTo(alloc.GetAllocatedResource())
sn.nodeEvents.SendAllocationRemovedEvent(sn.NodeID,
alloc.allocationKey, alloc.allocatedResource)
return alloc
@@ -353,6 +355,7 @@ func (sn *Node) addAllocationInternal(alloc *Allocation,
force bool) bool {
sn.allocations[alloc.GetAllocationKey()] = alloc
sn.allocatedResource.AddTo(res)
sn.availableResource.SubFrom(res)
+ sn.availableResource.Prune()
sn.nodeEvents.SendAllocationAddedEvent(sn.NodeID,
alloc.allocationKey, res)
result = true
return result
@@ -377,6 +380,7 @@ func (sn *Node) ReplaceAllocation(allocationKey string,
replace *Allocation, del
// The allocatedResource and availableResource should be updated in the
same way
sn.allocatedResource.AddTo(delta)
sn.availableResource.SubFrom(delta)
+ sn.availableResource.Prune()
if !before.FitIn(sn.allocatedResource) {
log.Log(log.SchedNode).Warn("unexpected increase in node usage
after placeholder replacement",
zap.String("placeholder allocationKey", allocationKey),
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index dd13b22b..2668183a 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -755,6 +755,11 @@ func (sq *Queue) decPendingResource(delta
*resources.Resource) {
zap.Error(err))
} else {
sq.updatePendingResourceMetrics()
+ // We should update the metrics before pruning the resource.
+ // For example:
+ // If we prune the resource first and the resource become nil
after pruning,
+ // the metrics will not be updated with nil resource, this is
not expected.
+ sq.pending.Prune()
}
}
@@ -1085,7 +1090,12 @@ func (sq *Queue) DecAllocatedResource(alloc
*resources.Resource) error {
defer sq.Unlock()
// all OK update the queue
sq.allocatedResource = resources.Sub(sq.allocatedResource, alloc)
+ // We should update the metrics before pruning the resource.
+ // For example:
+ // If we prune the resource first and the resource become nil after
pruning,
+ // the metrics will not be updated with nil resource, this is not
expected.
sq.updateAllocatedResourceMetrics()
+ sq.allocatedResource.Prune()
return nil
}
@@ -1118,6 +1128,11 @@ func (sq *Queue) DecPreemptingResource(alloc
*resources.Resource) {
sq.parent.DecPreemptingResource(alloc)
sq.preemptingResource = resources.Sub(sq.preemptingResource, alloc)
sq.updatePreemptingResourceMetrics()
+ // We should update the metrics before pruning the resource.
+ // For example:
+ // If we prune the resource first and the resource become nil after
pruning,
+ // the metrics will not be updated with nil resource, this is not
expected.
+ sq.preemptingResource.Prune()
}
func (sq *Queue) IsPrioritySortEnabled() bool {
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index bc46b72f..d4083a9b 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -281,7 +281,8 @@ func TestRemoveNodeWithAllocations(t *testing.T) {
assert.Equal(t, 1, len(released), "node did not release correct
allocation")
assert.Equal(t, 0, len(confirmed), "node did not confirm correct
allocation")
assert.Equal(t, released[0].GetAllocationKey(), allocAllocationKey,
"allocationKey returned by release not the same as on allocation")
- assertLimits(t, getTestUserGroup(), resources.Zero)
+ // zero resource should be pruned
+ assertLimits(t, getTestUserGroup(), nil)
assert.NilError(t, err, "the event should have been processed")
}
@@ -346,7 +347,8 @@ func TestRemoveNodeWithPlaceholders(t *testing.T) {
assert.Assert(t, resources.Equals(app.GetPendingResource(), appRes),
"app should have updated pending resources")
// check the interim state of the placeholder involved
assert.Check(t, !ph.HasRelease(), "placeholder should not have release
linked anymore")
- assertLimits(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0}))
+ // zero resource should be pruned
+ assertLimits(t, getTestUserGroup(), nil)
}
func TestCalculateNodesResourceUsage(t *testing.T) {
@@ -1077,7 +1079,8 @@ func TestRemoveApp(t *testing.T) {
allocs = partition.removeApplication("will_not_remove")
assert.Equal(t, 1, len(allocs), "existing application with allocations
returned unexpected allocations %v", allocs)
- assertLimits(t, getTestUserGroup(), resources.Zero)
+ // zero resource should be pruned
+ assertLimits(t, getTestUserGroup(), nil)
}
func TestRemoveAppAllocs(t *testing.T) {
@@ -1139,7 +1142,8 @@ func TestRemoveAppAllocs(t *testing.T) {
allocs, _ = partition.removeAllocation(release)
assert.Equal(t, 1, len(allocs), "removal request for existing
allocation returned wrong allocations: %v", allocs)
assert.Equal(t, 0, partition.GetTotalAllocationCount(), "removal
requests did not remove all allocations: %v", partition.allocations)
- assertLimits(t, getTestUserGroup(), resources.Zero)
+ // zero resource should be pruned
+ assertLimits(t, getTestUserGroup(), nil)
}
func TestRemoveAllPlaceholderAllocs(t *testing.T) {
@@ -2218,7 +2222,8 @@ func setupPreemptionForRequiredNode(t *testing.T)
(*PartitionContext, *objects.A
}
releases, _ := partition.removeAllocation(release)
assert.Equal(t, 0, len(releases), "not expecting any released
allocations")
- assertUserGroupResourceMaxLimits(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 0}),
getExpectedQueuesLimitsForPreemptionWithRequiredNode())
+ // zero resource should be pruned
+ assertUserGroupResourceMaxLimits(t, getTestUserGroup(), nil,
getExpectedQueuesLimitsForPreemptionWithRequiredNode())
return partition, app
}
@@ -2974,7 +2979,8 @@ func TestPlaceholderSmallerThanReal(t *testing.T) {
assert.Assert(t, resources.IsZero(node.GetAllocatedResource()),
"nothing should be allocated on node")
assert.Assert(t,
resources.IsZero(app.GetQueue().GetAllocatedResource()), "nothing should be
allocated on queue")
assert.Equal(t, 0, partition.GetTotalAllocationCount(), "no allocation
should be registered on the partition")
- assertLimits(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0,
"second": 0}))
+ // zero resource should be pruned
+ assertLimits(t, getTestUserGroup(), nil)
}
// one real allocation should trigger cleanup of all placeholders
@@ -3044,7 +3050,7 @@ func TestPlaceholderSmallerMulti(t *testing.T) {
assert.Assert(t,
resources.IsZero(app.GetQueue().GetAllocatedResource()), "nothing should be
allocated on queue")
assert.Equal(t, 0, partition.GetTotalAllocationCount(), "no allocation
should be registered on the partition")
assert.Equal(t, 0, partition.getPhAllocationCount(), "no placeholder
allocation should be on the partition")
- assertLimits(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0,
"second": 0}))
+ assertLimits(t, getTestUserGroup(), nil)
}
func TestPlaceholderBiggerThanReal(t *testing.T) {
diff --git a/pkg/scheduler/ugm/queue_tracker.go
b/pkg/scheduler/ugm/queue_tracker.go
index d4e1d373..8684bf83 100644
--- a/pkg/scheduler/ugm/queue_tracker.go
+++ b/pkg/scheduler/ugm/queue_tracker.go
@@ -153,6 +153,7 @@ func (qt *QueueTracker) decreaseTrackedResource(hierarchy
[]string, applicationI
}
}
qt.resourceUsage.SubFrom(usage)
+ qt.resourceUsage.Prune()
if removeApp {
log.Log(log.SchedUGM).Debug("Removed application from running
applications",
zap.String("application", applicationID),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]