This is an automated email from the ASF dual-hosted git repository.

ccondit pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/branch-1.6 by this push:
     new 37c1ee86 [YUNIKORN-3036] fix race prevention regression (#1014)
37c1ee86 is described below

commit 37c1ee86264d4b298fbb641589557291eabb74a0
Author: Wilfred Spiegelenburg <wilfr...@apache.org>
AuthorDate: Mon Mar 3 11:26:58 2025 -0600

    [YUNIKORN-3036] fix race prevention regression (#1014)
    
    Re-instate the change from YUNIKORN-1993 that was reversed as part of
    YUNIKORN-2658. Add more explicit comment to not move the code.
    Minor cleanup in function naming and fix the function length of
    removeAllocation (lint fix)
    
    Closes: #1014
    
    Signed-off-by: Craig Condit <ccon...@apache.org>
    (cherry picked from commit 9eee423b3df00cc9dbe98ae05888ddf525753a07)
---
 pkg/scheduler/partition.go           | 54 +++++++++++++++------------
 pkg/scheduler/partition_test.go      | 71 +++++++++---------------------------
 pkg/scheduler/tests/recovery_test.go |  4 +-
 3 files changed, 48 insertions(+), 81 deletions(-)

diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index 47c0d30c..b4b6e326 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -1354,7 +1354,9 @@ func (pc *PartitionContext) calculateNodesResourceUsage() 
map[string][]int {
        return mapResult
 }
 
-func (pc *PartitionContext) generateReleased(release *si.AllocationRelease, 
app *objects.Application) []*objects.Allocation {
+// processAllocationRelease processes the releases from the RM and removes the 
allocation(s) as requested.
+// Updates the application which can trigger an application state change.
+func (pc *PartitionContext) processAllocationRelease(release 
*si.AllocationRelease, app *objects.Application) []*objects.Allocation {
        released := make([]*objects.Allocation, 0)
        // when allocationKey is not specified, remove all allocations from the 
app
        allocationKey := release.GetAllocationKey()
@@ -1386,7 +1388,7 @@ func (pc *PartitionContext) generateReleased(release 
*si.AllocationRelease, app
 
 // removeAllocation removes the referenced allocation(s) from the applications 
and nodes
 // NOTE: this is a lock free call. It must NOT be called holding the 
PartitionContext lock.
-func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) 
([]*objects.Allocation, *objects.Allocation) { //nolint:funlen
+func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) 
([]*objects.Allocation, *objects.Allocation) {
        if release == nil {
                return nil, nil
        }
@@ -1402,25 +1404,20 @@ func (pc *PartitionContext) removeAllocation(release 
*si.AllocationRelease) ([]*
                return nil, nil
        }
 
-       // temp store for allocations manipulated
-       released := pc.generateReleased(release, app)
-       var confirmed *objects.Allocation
+       // **** DO NOT MOVE **** this must be called before any allocations are 
released.
+       // Processing a removal while in the Completing state could race with 
the state change. The race occurs between
+       // removing the allocation and updating the queue after node 
processing. If the state change removes the queue link
+       // before we get to updating the queue after the node we leave the 
resources as allocated on the queue. The queue
+       // will always exist at this point. Retrieving the queue now sidesteps 
this.
+       queue := app.GetQueue()
 
-       // all releases are collected: placeholder count needs updating for all 
placeholder releases
-       // regardless of what happens later
-       phReleases := 0
-       for _, r := range released {
-               if r.IsPlaceholder() {
-                       phReleases++
-               }
-       }
-       if phReleases > 0 {
-               pc.decPhAllocationCount(phReleases)
-       }
+       released := pc.processAllocationRelease(release, app)
+       pc.updatePhAllocationCount(released)
 
-       // for each allocation to release, update the node and queue.
        total := resources.NewResource()
        totalPreempting := resources.NewResource()
+       var confirmed *objects.Allocation
+       // for each allocation to release, update the node and queue.
        for _, alloc := range released {
                node := pc.GetNode(alloc.GetNodeID())
                if node == nil {
@@ -1471,13 +1468,6 @@ func (pc *PartitionContext) removeAllocation(release 
*si.AllocationRelease) ([]*
                }
        }
 
-       // Processing a removal while in the Completing state could race with 
the state change.
-       // The race occurs between removing the allocation and updating the 
queue after node processing.
-       // If the state change removes the queue link before we get to updating 
the queue after the node we
-       // leave the resources as allocated on the queue. The queue cannot be 
removed yet at this point as
-       // there are still allocations left. So retrieve the queue early to 
sidestep the race.
-       queue := app.GetQueue()
-
        if resources.StrictlyGreaterThanZero(total) {
                if err := queue.DecAllocatedResource(total); err != nil {
                        log.Log(log.SchedPartition).Warn("failed to release 
resources from queue",
@@ -1515,6 +1505,22 @@ func (pc *PartitionContext) removeAllocation(release 
*si.AllocationRelease) ([]*
        return released, confirmed
 }
 
+// updatePhAllocationCount checks the released allocations and updates the 
partition context counter of allocated
+// placeholders.
+func (pc *PartitionContext) updatePhAllocationCount(released 
[]*objects.Allocation) {
+       // all releases are collected: placeholder count needs updating for all 
placeholder releases
+       // regardless of what happens later
+       phReleases := 0
+       for _, a := range released {
+               if a.IsPlaceholder() {
+                       phReleases++
+               }
+       }
+       if phReleases > 0 {
+               pc.decPhAllocationCount(phReleases)
+       }
+}
+
 func (pc *PartitionContext) GetCurrentState() string {
        return pc.stateMachine.Current()
 }
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index c6351e0a..8e88bd52 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -1499,9 +1499,7 @@ func TestGetQueue(t *testing.T) {
 func TestTryAllocate(t *testing.T) {
        setupUGM()
        partition := createQueuesNodes(t)
-       if partition == nil {
-               t.Fatal("partition create failed")
-       }
+       assert.Assert(t, partition != nil, "partition create failed")
        if result := partition.tryAllocate(); result != nil {
                t.Fatalf("empty cluster allocate returned allocation: %s", 
result)
        }
@@ -1578,12 +1576,7 @@ func TestTryAllocate(t *testing.T) {
 func TestRequiredNodeReservation(t *testing.T) {
        setupUGM()
        partition := createQueuesNodes(t)
-       if partition == nil {
-               t.Fatal("partition create failed")
-       }
-       if result := partition.tryAllocate(); result != nil {
-               t.Fatalf("empty cluster allocate returned allocation: %s", 
result)
-       }
+       assert.Assert(t, partition != nil, "partition create failed")
        node := partition.nodes.GetNode(nodeID1)
        if node == nil {
                t.Fatal("node-1 should have been created")
@@ -1664,9 +1657,7 @@ func TestRequiredNodeReservation(t *testing.T) {
 // allocate ask request with required node having non daemon set reservations
 func TestRequiredNodeCancelOtherReservations(t *testing.T) {
        partition := createQueuesNodes(t)
-       if partition == nil {
-               t.Fatal("partition create failed")
-       }
+       assert.Assert(t, partition != nil, "partition create failed")
        if result := partition.tryAllocate(); result != nil {
                t.Fatalf("empty cluster allocate returned allocation: %s", 
result)
        }
@@ -1744,9 +1735,7 @@ func TestRequiredNodeCancelOtherReservations(t 
*testing.T) {
 // allocate ask request with required node having daemon set reservations
 func TestRequiredNodeCancelDSReservations(t *testing.T) {
        partition := createQueuesNodes(t)
-       if partition == nil {
-               t.Fatal("partition create failed")
-       }
+       assert.Assert(t, partition != nil, "partition create failed")
        if result := partition.tryAllocate(); result != nil {
                t.Fatalf("empty cluster allocate returned allocation: %s", 
result)
        }
@@ -1829,9 +1818,7 @@ func TestRequiredNodeCancelDSReservations(t *testing.T) {
 func TestRequiredNodeNotExist(t *testing.T) {
        setupUGM()
        partition := createQueuesNodes(t)
-       if partition == nil {
-               t.Fatal("partition create failed")
-       }
+       assert.Assert(t, partition != nil, "partition create failed")
        if result := partition.tryAllocate(); result != nil {
                t.Fatalf("empty cluster allocate returned allocation: %s", 
result)
        }
@@ -1866,9 +1853,7 @@ func TestRequiredNodeNotExist(t *testing.T) {
 // basic ds scheduling on specific node in first allocate run itself (without 
any need for reservation)
 func TestRequiredNodeAllocation(t *testing.T) {
        partition := createQueuesNodes(t)
-       if partition == nil {
-               t.Fatal("partition create failed")
-       }
+       assert.Assert(t, partition != nil, "partition create failed")
        if result := partition.tryAllocate(); result != nil {
                t.Fatalf("empty cluster allocate returned allocation: %s", 
result.Request.String())
        }
@@ -2037,9 +2022,7 @@ func TestPreemptionForRequiredNodeReservedAlloc(t 
*testing.T) {
 
 func TestPreemptionForRequiredNodeMultipleAttemptsAvoided(t *testing.T) {
        partition := createQueuesNodes(t)
-       if partition == nil {
-               t.Fatal("partition create failed")
-       }
+       assert.Assert(t, partition != nil, "partition create failed")
 
        app, testHandler := newApplicationWithHandler(appID1, "default", 
"root.parent.sub-leaf")
        res, err := resources.NewResourceFromConf(map[string]string{"vcore": 
"8"})
@@ -2119,9 +2102,7 @@ func 
getExpectedQueuesLimitsForPreemptionWithRequiredNode() map[string]map[strin
 // setup the partition with existing allocations so we can test preemption
 func setupPreemption(t *testing.T) (*PartitionContext, *objects.Application, 
*objects.Application, *objects.Allocation, *objects.Allocation) {
        partition := createPreemptionQueuesNodes(t)
-       if partition == nil {
-               t.Fatal("partition create failed")
-       }
+       assert.Assert(t, partition != nil, "partition create failed")
        if result := partition.tryAllocate(); result != nil {
                t.Fatalf("empty cluster allocate returned allocation: %s", 
result)
        }
@@ -2181,9 +2162,7 @@ func setupPreemption(t *testing.T) (*PartitionContext, 
*objects.Application, *ob
 // setup the partition in a state that we need for multiple tests
 func setupPreemptionForRequiredNode(t *testing.T) (*PartitionContext, 
*objects.Application) {
        partition := createQueuesNodes(t)
-       if partition == nil {
-               t.Fatal("partition create failed")
-       }
+       assert.Assert(t, partition != nil, "partition create failed")
        if result := partition.tryAllocate(); result != nil {
                t.Fatalf("empty cluster allocate returned allocation: %s", 
result)
        }
@@ -2261,9 +2240,7 @@ func setupPreemptionForRequiredNode(t *testing.T) 
(*PartitionContext, *objects.A
 func TestTryAllocateLarge(t *testing.T) {
        setupUGM()
        partition := createQueuesNodes(t)
-       if partition == nil {
-               t.Fatal("partition create failed")
-       }
+       assert.Assert(t, partition != nil, "partition create failed")
        if result := partition.tryAllocate(); result != nil {
                t.Fatalf("empty cluster allocate returned allocation: %s", 
result)
        }
@@ -2294,9 +2271,7 @@ func TestTryAllocateLarge(t *testing.T) {
 func TestAllocReserveNewNode(t *testing.T) {
        setupUGM()
        partition := createQueuesNodes(t)
-       if partition == nil {
-               t.Fatal("partition create failed")
-       }
+       assert.Assert(t, partition != nil, "partition create failed")
        if result := partition.tryAllocate(); result != nil {
                t.Fatalf("empty cluster allocate returned result: %s", result)
        }
@@ -2365,9 +2340,7 @@ func TestAllocReserveNewNode(t *testing.T) {
 func TestTryAllocateReserve(t *testing.T) {
        setupUGM()
        partition := createQueuesNodes(t)
-       if partition == nil {
-               t.Fatal("partition create failed")
-       }
+       assert.Assert(t, partition != nil, "partition create failed")
        if result := partition.tryReservedAllocate(); result != nil {
                t.Fatalf("empty cluster reserved allocate returned allocation: 
%s", result)
        }
@@ -2439,9 +2412,7 @@ func TestTryAllocateReserve(t *testing.T) {
 func TestTryAllocateWithReserved(t *testing.T) {
        setupUGM()
        partition := createQueuesNodes(t)
-       if partition == nil {
-               t.Fatal("partition create failed")
-       }
+       assert.Assert(t, partition != nil, "partition create failed")
        if alloc := partition.tryReservedAllocate(); alloc != nil {
                t.Fatalf("empty cluster reserved allocate returned allocation: 
%v", alloc)
        }
@@ -2463,9 +2434,7 @@ func TestTryAllocateWithReserved(t *testing.T) {
 
        // reserve one node: scheduling should happen on the other
        node2 := partition.GetNode(nodeID2)
-       if node2 == nil {
-               t.Fatal("expected node-2 to be returned got nil")
-       }
+       assert.Assert(t, node2 != nil, "expected node-2 to be returned got nil")
        partition.reserve(app, node2, ask)
        if app.NodeReservedForAsk(allocKey) != nodeID2 {
                t.Fatal("reservation failure for alloc-1 and node-2")
@@ -2494,9 +2463,7 @@ func TestTryAllocateWithReserved(t *testing.T) {
 func TestScheduleRemoveReservedAsk(t *testing.T) {
        setupUGM()
        partition := createQueuesNodes(t)
-       if partition == nil {
-               t.Fatal("partition create failed")
-       }
+       assert.Assert(t, partition != nil, "partition create failed")
        if result := partition.tryAllocate(); result != nil {
                t.Fatalf("empty cluster allocate returned allocation: %s", 
result)
        }
@@ -2584,9 +2551,7 @@ func TestScheduleRemoveReservedAsk(t *testing.T) {
 // update the config with nodes registered and make sure that the root max and 
guaranteed are not changed
 func TestUpdateRootQueue(t *testing.T) {
        partition := createQueuesNodes(t)
-       if partition == nil {
-               t.Fatal("partition create failed")
-       }
+       assert.Assert(t, partition != nil, "partition create failed")
        res, err := resources.NewResourceFromConf(map[string]string{"vcore": 
"20"})
        assert.NilError(t, err, "resource creation failed")
        assert.Assert(t, resources.Equals(res, 
partition.totalPartitionResource), "partition resource not set as expected")
@@ -3890,9 +3855,7 @@ func TestGetNodeSortingPolicyWhenNewPartitionFromConfig(t 
*testing.T) {
 func TestTryAllocateMaxRunning(t *testing.T) {
        const resType = "vcore"
        partition := createQueuesNodes(t)
-       if partition == nil {
-               t.Fatal("partition create failed")
-       }
+       assert.Assert(t, partition != nil, "partition create failed")
        if result := partition.tryAllocate(); result != nil {
                t.Fatalf("empty cluster allocate returned allocation: %s", 
result)
        }
diff --git a/pkg/scheduler/tests/recovery_test.go 
b/pkg/scheduler/tests/recovery_test.go
index eeb9b8f4..c7d21912 100644
--- a/pkg/scheduler/tests/recovery_test.go
+++ b/pkg/scheduler/tests/recovery_test.go
@@ -675,9 +675,7 @@ func TestAppRecovery(t *testing.T) {
        mockRM.waitForAcceptedNode(t, "node-2:1234", 1000)
 
        app := 
serviceContext.Scheduler.GetClusterContext().GetApplication(appID1, 
"[rm:123]default")
-       if app == nil {
-               t.Fatal("application not found after recovery")
-       }
+       assert.Assert(t, app != nil, "application not found after recovery")
        assert.Equal(t, app.ApplicationID, appID1)
        assert.Equal(t, app.GetQueuePath(), "root.a")
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@yunikorn.apache.org
For additional commands, e-mail: issues-h...@yunikorn.apache.org

Reply via email to