This is an automated email from the ASF dual-hosted git repository.

ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a4acda8 [YUNIKORN-2802] Prune zero values from computed resource 
types (#950)
8a4acda8 is described below

commit 8a4acda8ae96100117d83d9dc10a4a9dff6f9ed1
Author: qzhu <[email protected]>
AuthorDate: Wed Aug 21 13:38:23 2024 -0500

    [YUNIKORN-2802] Prune zero values from computed resource types (#950)
    
    Closes: #950
    
    Signed-off-by: Craig Condit <[email protected]>
---
 pkg/scheduler/objects/application.go      | 11 ++++++++---
 pkg/scheduler/objects/application_test.go | 13 ++++++++-----
 pkg/scheduler/objects/node.go             |  4 ++++
 pkg/scheduler/objects/queue.go            | 15 +++++++++++++++
 pkg/scheduler/partition_test.go           | 20 +++++++++++++-------
 pkg/scheduler/ugm/queue_tracker.go        |  1 +
 6 files changed, 49 insertions(+), 15 deletions(-)

diff --git a/pkg/scheduler/objects/application.go 
b/pkg/scheduler/objects/application.go
index 1ccde74e..4b84c957 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -581,6 +581,7 @@ func (sa *Application) removeAsksInternal(allocKey string, 
detail si.EventRecord
                        if !ask.IsAllocated() {
                                deltaPendingResource = 
ask.GetAllocatedResource()
                                sa.pending = resources.Sub(sa.pending, 
deltaPendingResource)
+                               sa.pending.Prune()
                        }
                        delete(sa.requests, allocKey)
                        sa.sortedRequests.remove(ask)
@@ -648,6 +649,7 @@ func (sa *Application) AddAllocationAsk(ask *Allocation) 
error {
        // Update total pending resource
        delta.SubFrom(oldAskResource)
        sa.pending = resources.Add(sa.pending, delta)
+       sa.pending.Prune()
        sa.queue.incPendingResource(delta)
 
        log.Log(log.SchedApplication).Info("ask added successfully to 
application",
@@ -726,10 +728,11 @@ func (sa *Application) allocateAsk(ask *Allocation) 
(*resources.Resource, error)
                sa.updateAskMaxPriority()
        }
 
-       delta := resources.Multiply(ask.GetAllocatedResource(), -1)
-       sa.pending = resources.Add(sa.pending, delta)
+       delta := ask.GetAllocatedResource()
+       sa.pending = resources.Sub(sa.pending, delta)
+       sa.pending.Prune()
        // update the pending of the queue with the same delta
-       sa.queue.incPendingResource(delta)
+       sa.queue.decPendingResource(delta)
 
        return delta, nil
 }
@@ -1797,6 +1800,7 @@ func (sa *Application) 
removeAllocationInternal(allocationKey string, releaseTyp
                // as and when every ph gets removed (for replacement), 
resource usage would be reduced.
                // When real allocation happens as part of replacement, usage 
would be increased again with real alloc resource
                sa.allocatedPlaceholder = 
resources.Sub(sa.allocatedPlaceholder, alloc.GetAllocatedResource())
+               sa.allocatedPlaceholder.Prune()
 
                // if all the placeholders are replaced, clear the placeholder 
timer
                if resources.IsZero(sa.allocatedPlaceholder) {
@@ -1821,6 +1825,7 @@ func (sa *Application) 
removeAllocationInternal(allocationKey string, releaseTyp
                sa.decUserResourceUsage(alloc.GetAllocatedResource(), removeApp)
        } else {
                sa.allocatedResource = resources.Sub(sa.allocatedResource, 
alloc.GetAllocatedResource())
+               sa.allocatedResource.Prune()
 
                // Aggregate the resources used by this alloc to the 
application's resource tracker
                sa.trackCompletedResource(alloc)
diff --git a/pkg/scheduler/objects/application_test.go 
b/pkg/scheduler/objects/application_test.go
index 055f89bf..4f61784e 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -432,8 +432,8 @@ func TestAllocateDeallocate(t *testing.T) {
        err = app.AddAllocationAsk(ask)
        assert.NilError(t, err, "ask should have been added to app")
        // allocate
-       if delta, err := app.AllocateAsk(aKey); err != nil || 
!resources.Equals(resources.Multiply(res, -1), delta) {
-               t.Errorf("AllocateAsk() did not return correct delta, err %v, 
expected %v got %v", err, resources.Multiply(res, -1), delta)
+       if delta, err := app.AllocateAsk(aKey); err != nil || 
!resources.Equals(res, delta) {
+               t.Errorf("AllocateAsk() did not return correct delta, err %v, 
expected %v got %v", err, res, delta)
        }
        // allocate again should fail
        if delta, err := app.AllocateAsk(aKey); err == nil || delta != nil {
@@ -943,7 +943,8 @@ func TestStateChangeOnPlaceholderAdd(t *testing.T) {
        released = app.RemoveAllocationAsk(askID)
        assert.Equal(t, released, 0, "allocation ask should not have been 
reserved")
        assert.Assert(t, app.IsCompleting(), "Application should have stayed 
same, changed unexpectedly: %s", app.CurrentState())
-       assertUserGroupResource(t, getTestUserGroup(), 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0}))
+       // zero resource should be pruned
+       assertUserGroupResource(t, getTestUserGroup(), nil)
 
        log := app.GetStateLog()
        assert.Equal(t, len(log), 2, "wrong number of app events")
@@ -1674,7 +1675,8 @@ func TestIncAndDecUserResourceUsage(t *testing.T) {
        app.decUserResourceUsage(res, false)
        assertUserGroupResource(t, getTestUserGroup(), res)
        app.decUserResourceUsage(res, false)
-       assertUserGroupResource(t, getTestUserGroup(), 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0}))
+       // zero resource should be pruned
+       assertUserGroupResource(t, getTestUserGroup(), nil)
 }
 
 func TestIncAndDecUserResourceUsageInSameGroup(t *testing.T) {
@@ -1722,7 +1724,8 @@ func TestIncAndDecUserResourceUsageInSameGroup(t 
*testing.T) {
        // Decrease testuser and testuser2 to 0
        app.decUserResourceUsage(res, false)
        app2.decUserResourceUsage(res, false)
-       assertUserResourcesAndGroupResources(t, getUserGroup("testuser", 
testgroups), 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0}), nil, 0)
+       // zero resoure should be pruned
+       assertUserResourcesAndGroupResources(t, getUserGroup("testuser", 
testgroups), nil, nil, 0)
 }
 
 func TestGetAllRequests(t *testing.T) {
diff --git a/pkg/scheduler/objects/node.go b/pkg/scheduler/objects/node.go
index db77ee68..fcce3c7f 100644
--- a/pkg/scheduler/objects/node.go
+++ b/pkg/scheduler/objects/node.go
@@ -202,6 +202,7 @@ func (sn *Node) refreshAvailableResource() {
        sn.availableResource = sn.totalResource.Clone()
        sn.availableResource.SubFrom(sn.allocatedResource)
        sn.availableResource.SubFrom(sn.occupiedResource)
+       sn.availableResource.Prune()
        // check if any quantity is negative: a nil resource is all 0's
        if !resources.StrictlyGreaterThanOrEquals(sn.availableResource, nil) {
                log.Log(log.SchedNode).Warn("Node update triggered over 
allocated node",
@@ -311,6 +312,7 @@ func (sn *Node) RemoveAllocation(allocationKey string) 
*Allocation {
        if alloc != nil {
                delete(sn.allocations, allocationKey)
                sn.allocatedResource.SubFrom(alloc.GetAllocatedResource())
+               sn.allocatedResource.Prune()
                sn.availableResource.AddTo(alloc.GetAllocatedResource())
                sn.nodeEvents.SendAllocationRemovedEvent(sn.NodeID, 
alloc.allocationKey, alloc.allocatedResource)
                return alloc
@@ -353,6 +355,7 @@ func (sn *Node) addAllocationInternal(alloc *Allocation, 
force bool) bool {
                sn.allocations[alloc.GetAllocationKey()] = alloc
                sn.allocatedResource.AddTo(res)
                sn.availableResource.SubFrom(res)
+               sn.availableResource.Prune()
                sn.nodeEvents.SendAllocationAddedEvent(sn.NodeID, 
alloc.allocationKey, res)
                result = true
                return result
@@ -377,6 +380,7 @@ func (sn *Node) ReplaceAllocation(allocationKey string, 
replace *Allocation, del
        // The allocatedResource and availableResource should be updated in the 
same way
        sn.allocatedResource.AddTo(delta)
        sn.availableResource.SubFrom(delta)
+       sn.availableResource.Prune()
        if !before.FitIn(sn.allocatedResource) {
                log.Log(log.SchedNode).Warn("unexpected increase in node usage 
after placeholder replacement",
                        zap.String("placeholder allocationKey", allocationKey),
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index dd13b22b..2668183a 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -755,6 +755,11 @@ func (sq *Queue) decPendingResource(delta 
*resources.Resource) {
                        zap.Error(err))
        } else {
                sq.updatePendingResourceMetrics()
+               // We should update the metrics before pruning the resource.
+               // For example:
+               // If we prune the resource first and the resource become nil 
after pruning,
+               // the metrics will not be updated with nil resource, this is 
not expected.
+               sq.pending.Prune()
        }
 }
 
@@ -1085,7 +1090,12 @@ func (sq *Queue) DecAllocatedResource(alloc 
*resources.Resource) error {
        defer sq.Unlock()
        // all OK update the queue
        sq.allocatedResource = resources.Sub(sq.allocatedResource, alloc)
+       // We should update the metrics before pruning the resource.
+       // For example:
+       // If we prune the resource first and the resource become nil after 
pruning,
+       // the metrics will not be updated with nil resource, this is not 
expected.
        sq.updateAllocatedResourceMetrics()
+       sq.allocatedResource.Prune()
        return nil
 }
 
@@ -1118,6 +1128,11 @@ func (sq *Queue) DecPreemptingResource(alloc 
*resources.Resource) {
        sq.parent.DecPreemptingResource(alloc)
        sq.preemptingResource = resources.Sub(sq.preemptingResource, alloc)
        sq.updatePreemptingResourceMetrics()
+       // We should update the metrics before pruning the resource.
+       // For example:
+       // If we prune the resource first and the resource become nil after 
pruning,
+       // the metrics will not be updated with nil resource, this is not 
expected.
+       sq.preemptingResource.Prune()
 }
 
 func (sq *Queue) IsPrioritySortEnabled() bool {
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index bc46b72f..d4083a9b 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -281,7 +281,8 @@ func TestRemoveNodeWithAllocations(t *testing.T) {
        assert.Equal(t, 1, len(released), "node did not release correct 
allocation")
        assert.Equal(t, 0, len(confirmed), "node did not confirm correct 
allocation")
        assert.Equal(t, released[0].GetAllocationKey(), allocAllocationKey, 
"allocationKey returned by release not the same as on allocation")
-       assertLimits(t, getTestUserGroup(), resources.Zero)
+       // zero resource should be pruned
+       assertLimits(t, getTestUserGroup(), nil)
 
        assert.NilError(t, err, "the event should have been processed")
 }
@@ -346,7 +347,8 @@ func TestRemoveNodeWithPlaceholders(t *testing.T) {
        assert.Assert(t, resources.Equals(app.GetPendingResource(), appRes), 
"app should have updated pending resources")
        // check the interim state of the placeholder involved
        assert.Check(t, !ph.HasRelease(), "placeholder should not have release 
linked anymore")
-       assertLimits(t, getTestUserGroup(), 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0}))
+       // zero resource should be pruned
+       assertLimits(t, getTestUserGroup(), nil)
 }
 
 func TestCalculateNodesResourceUsage(t *testing.T) {
@@ -1077,7 +1079,8 @@ func TestRemoveApp(t *testing.T) {
 
        allocs = partition.removeApplication("will_not_remove")
        assert.Equal(t, 1, len(allocs), "existing application with allocations 
returned unexpected allocations %v", allocs)
-       assertLimits(t, getTestUserGroup(), resources.Zero)
+       // zero resource should be pruned
+       assertLimits(t, getTestUserGroup(), nil)
 }
 
 func TestRemoveAppAllocs(t *testing.T) {
@@ -1139,7 +1142,8 @@ func TestRemoveAppAllocs(t *testing.T) {
        allocs, _ = partition.removeAllocation(release)
        assert.Equal(t, 1, len(allocs), "removal request for existing 
allocation returned wrong allocations: %v", allocs)
        assert.Equal(t, 0, partition.GetTotalAllocationCount(), "removal 
requests did not remove all allocations: %v", partition.allocations)
-       assertLimits(t, getTestUserGroup(), resources.Zero)
+       // zero resource should be pruned
+       assertLimits(t, getTestUserGroup(), nil)
 }
 
 func TestRemoveAllPlaceholderAllocs(t *testing.T) {
@@ -2218,7 +2222,8 @@ func setupPreemptionForRequiredNode(t *testing.T) 
(*PartitionContext, *objects.A
        }
        releases, _ := partition.removeAllocation(release)
        assert.Equal(t, 0, len(releases), "not expecting any released 
allocations")
-       assertUserGroupResourceMaxLimits(t, getTestUserGroup(), 
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 0}), 
getExpectedQueuesLimitsForPreemptionWithRequiredNode())
+       // zero resource should be pruned
+       assertUserGroupResourceMaxLimits(t, getTestUserGroup(), nil, 
getExpectedQueuesLimitsForPreemptionWithRequiredNode())
        return partition, app
 }
 
@@ -2974,7 +2979,8 @@ func TestPlaceholderSmallerThanReal(t *testing.T) {
        assert.Assert(t, resources.IsZero(node.GetAllocatedResource()), 
"nothing should be allocated on node")
        assert.Assert(t, 
resources.IsZero(app.GetQueue().GetAllocatedResource()), "nothing should be 
allocated on queue")
        assert.Equal(t, 0, partition.GetTotalAllocationCount(), "no allocation 
should be registered on the partition")
-       assertLimits(t, getTestUserGroup(), 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0, 
"second": 0}))
+       // zero resource should be pruned
+       assertLimits(t, getTestUserGroup(), nil)
 }
 
 // one real allocation should trigger cleanup of all placeholders
@@ -3044,7 +3050,7 @@ func TestPlaceholderSmallerMulti(t *testing.T) {
        assert.Assert(t, 
resources.IsZero(app.GetQueue().GetAllocatedResource()), "nothing should be 
allocated on queue")
        assert.Equal(t, 0, partition.GetTotalAllocationCount(), "no allocation 
should be registered on the partition")
        assert.Equal(t, 0, partition.getPhAllocationCount(), "no placeholder 
allocation should be on the partition")
-       assertLimits(t, getTestUserGroup(), 
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0, 
"second": 0}))
+       assertLimits(t, getTestUserGroup(), nil)
 }
 
 func TestPlaceholderBiggerThanReal(t *testing.T) {
diff --git a/pkg/scheduler/ugm/queue_tracker.go 
b/pkg/scheduler/ugm/queue_tracker.go
index d4e1d373..8684bf83 100644
--- a/pkg/scheduler/ugm/queue_tracker.go
+++ b/pkg/scheduler/ugm/queue_tracker.go
@@ -153,6 +153,7 @@ func (qt *QueueTracker) decreaseTrackedResource(hierarchy 
[]string, applicationI
                }
        }
        qt.resourceUsage.SubFrom(usage)
+       qt.resourceUsage.Prune()
        if removeApp {
                log.Log(log.SchedUGM).Debug("Removed application from running 
applications",
                        zap.String("application", applicationID),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to