This is an automated email from the ASF dual-hosted git repository. ccondit pushed a commit to branch branch-1.6 in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/branch-1.6 by this push: new 37c1ee86 [YUNIKORN-3036] fix race prevention regression (#1014) 37c1ee86 is described below commit 37c1ee86264d4b298fbb641589557291eabb74a0 Author: Wilfred Spiegelenburg <wilfr...@apache.org> AuthorDate: Mon Mar 3 11:26:58 2025 -0600 [YUNIKORN-3036] fix race prevention regression (#1014) Re-instate the change from YUNIKORN-1993 that was reversed as part of YUNIKORN-2658. Add more explicit comment to not move the code. Minor cleanup in function naming and fix the function length of removeAllocation (lint fix) Closes: #1014 Signed-off-by: Craig Condit <ccon...@apache.org> (cherry picked from commit 9eee423b3df00cc9dbe98ae05888ddf525753a07) --- pkg/scheduler/partition.go | 54 +++++++++++++++------------ pkg/scheduler/partition_test.go | 71 +++++++++--------------------------- pkg/scheduler/tests/recovery_test.go | 4 +- 3 files changed, 48 insertions(+), 81 deletions(-) diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go index 47c0d30c..b4b6e326 100644 --- a/pkg/scheduler/partition.go +++ b/pkg/scheduler/partition.go @@ -1354,7 +1354,9 @@ func (pc *PartitionContext) calculateNodesResourceUsage() map[string][]int { return mapResult } -func (pc *PartitionContext) generateReleased(release *si.AllocationRelease, app *objects.Application) []*objects.Allocation { +// processAllocationRelease processes the releases from the RM and removes the allocation(s) as requested. +// Updates the application which can trigger an application state change. +func (pc *PartitionContext) processAllocationRelease(release *si.AllocationRelease, app *objects.Application) []*objects.Allocation { released := make([]*objects.Allocation, 0) // when allocationKey is not specified, remove all allocations from the app allocationKey := release.GetAllocationKey() @@ -1386,7 +1388,7 @@ func (pc *PartitionContext) generateReleased(release *si.AllocationRelease, app // removeAllocation removes the referenced allocation(s) from the applications and nodes // NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock. -func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]*objects.Allocation, *objects.Allocation) { //nolint:funlen +func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]*objects.Allocation, *objects.Allocation) { if release == nil { return nil, nil } @@ -1402,25 +1404,20 @@ func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]* return nil, nil } - // temp store for allocations manipulated - released := pc.generateReleased(release, app) - var confirmed *objects.Allocation + // **** DO NOT MOVE **** this must be called before any allocations are released. + // Processing a removal while in the Completing state could race with the state change. The race occurs between + // removing the allocation and updating the queue after node processing. If the state change removes the queue link + // before we get to updating the queue after the node we leave the resources as allocated on the queue. The queue + // will always exist at this point. Retrieving the queue now sidesteps this. + queue := app.GetQueue() - // all releases are collected: placeholder count needs updating for all placeholder releases - // regardless of what happens later - phReleases := 0 - for _, r := range released { - if r.IsPlaceholder() { - phReleases++ - } - } - if phReleases > 0 { - pc.decPhAllocationCount(phReleases) - } + released := pc.processAllocationRelease(release, app) + pc.updatePhAllocationCount(released) - // for each allocation to release, update the node and queue. total := resources.NewResource() totalPreempting := resources.NewResource() + var confirmed *objects.Allocation + // for each allocation to release, update the node and queue. for _, alloc := range released { node := pc.GetNode(alloc.GetNodeID()) if node == nil { @@ -1471,13 +1468,6 @@ func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]* } } - // Processing a removal while in the Completing state could race with the state change. - // The race occurs between removing the allocation and updating the queue after node processing. - // If the state change removes the queue link before we get to updating the queue after the node we - // leave the resources as allocated on the queue. The queue cannot be removed yet at this point as - // there are still allocations left. So retrieve the queue early to sidestep the race. - queue := app.GetQueue() - if resources.StrictlyGreaterThanZero(total) { if err := queue.DecAllocatedResource(total); err != nil { log.Log(log.SchedPartition).Warn("failed to release resources from queue", @@ -1515,6 +1505,22 @@ func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]* return released, confirmed } +// updatePhAllocationCount checks the released allocations and updates the partition context counter of allocated +// placeholders. +func (pc *PartitionContext) updatePhAllocationCount(released []*objects.Allocation) { + // all releases are collected: placeholder count needs updating for all placeholder releases + // regardless of what happens later + phReleases := 0 + for _, a := range released { + if a.IsPlaceholder() { + phReleases++ + } + } + if phReleases > 0 { + pc.decPhAllocationCount(phReleases) + } +} + func (pc *PartitionContext) GetCurrentState() string { return pc.stateMachine.Current() } diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go index c6351e0a..8e88bd52 100644 --- a/pkg/scheduler/partition_test.go +++ b/pkg/scheduler/partition_test.go @@ -1499,9 +1499,7 @@ func TestGetQueue(t *testing.T) { func TestTryAllocate(t *testing.T) { setupUGM() partition := createQueuesNodes(t) - if partition == nil { - t.Fatal("partition create failed") - } + assert.Assert(t, partition != nil, "partition create failed") if result := partition.tryAllocate(); result != nil { t.Fatalf("empty cluster allocate returned allocation: %s", result) } @@ -1578,12 +1576,7 @@ func TestTryAllocate(t *testing.T) { func TestRequiredNodeReservation(t *testing.T) { setupUGM() partition := createQueuesNodes(t) - if partition == nil { - t.Fatal("partition create failed") - } - if result := partition.tryAllocate(); result != nil { - t.Fatalf("empty cluster allocate returned allocation: %s", result) - } + assert.Assert(t, partition != nil, "partition create failed") node := partition.nodes.GetNode(nodeID1) if node == nil { t.Fatal("node-1 should have been created") @@ -1664,9 +1657,7 @@ func TestRequiredNodeReservation(t *testing.T) { // allocate ask request with required node having non daemon set reservations func TestRequiredNodeCancelOtherReservations(t *testing.T) { partition := createQueuesNodes(t) - if partition == nil { - t.Fatal("partition create failed") - } + assert.Assert(t, partition != nil, "partition create failed") if result := partition.tryAllocate(); result != nil { t.Fatalf("empty cluster allocate returned allocation: %s", result) } @@ -1744,9 +1735,7 @@ func TestRequiredNodeCancelOtherReservations(t *testing.T) { // allocate ask request with required node having daemon set reservations func TestRequiredNodeCancelDSReservations(t *testing.T) { partition := createQueuesNodes(t) - if partition == nil { - t.Fatal("partition create failed") - } + assert.Assert(t, partition != nil, "partition create failed") if result := partition.tryAllocate(); result != nil { t.Fatalf("empty cluster allocate returned allocation: %s", result) } @@ -1829,9 +1818,7 @@ func TestRequiredNodeCancelDSReservations(t *testing.T) { func TestRequiredNodeNotExist(t *testing.T) { setupUGM() partition := createQueuesNodes(t) - if partition == nil { - t.Fatal("partition create failed") - } + assert.Assert(t, partition != nil, "partition create failed") if result := partition.tryAllocate(); result != nil { t.Fatalf("empty cluster allocate returned allocation: %s", result) } @@ -1866,9 +1853,7 @@ func TestRequiredNodeNotExist(t *testing.T) { // basic ds scheduling on specific node in first allocate run itself (without any need for reservation) func TestRequiredNodeAllocation(t *testing.T) { partition := createQueuesNodes(t) - if partition == nil { - t.Fatal("partition create failed") - } + assert.Assert(t, partition != nil, "partition create failed") if result := partition.tryAllocate(); result != nil { t.Fatalf("empty cluster allocate returned allocation: %s", result.Request.String()) } @@ -2037,9 +2022,7 @@ func TestPreemptionForRequiredNodeReservedAlloc(t *testing.T) { func TestPreemptionForRequiredNodeMultipleAttemptsAvoided(t *testing.T) { partition := createQueuesNodes(t) - if partition == nil { - t.Fatal("partition create failed") - } + assert.Assert(t, partition != nil, "partition create failed") app, testHandler := newApplicationWithHandler(appID1, "default", "root.parent.sub-leaf") res, err := resources.NewResourceFromConf(map[string]string{"vcore": "8"}) @@ -2119,9 +2102,7 @@ func getExpectedQueuesLimitsForPreemptionWithRequiredNode() map[string]map[strin // setup the partition with existing allocations so we can test preemption func setupPreemption(t *testing.T) (*PartitionContext, *objects.Application, *objects.Application, *objects.Allocation, *objects.Allocation) { partition := createPreemptionQueuesNodes(t) - if partition == nil { - t.Fatal("partition create failed") - } + assert.Assert(t, partition != nil, "partition create failed") if result := partition.tryAllocate(); result != nil { t.Fatalf("empty cluster allocate returned allocation: %s", result) } @@ -2181,9 +2162,7 @@ func setupPreemption(t *testing.T) (*PartitionContext, *objects.Application, *ob // setup the partition in a state that we need for multiple tests func setupPreemptionForRequiredNode(t *testing.T) (*PartitionContext, *objects.Application) { partition := createQueuesNodes(t) - if partition == nil { - t.Fatal("partition create failed") - } + assert.Assert(t, partition != nil, "partition create failed") if result := partition.tryAllocate(); result != nil { t.Fatalf("empty cluster allocate returned allocation: %s", result) } @@ -2261,9 +2240,7 @@ func setupPreemptionForRequiredNode(t *testing.T) (*PartitionContext, *objects.A func TestTryAllocateLarge(t *testing.T) { setupUGM() partition := createQueuesNodes(t) - if partition == nil { - t.Fatal("partition create failed") - } + assert.Assert(t, partition != nil, "partition create failed") if result := partition.tryAllocate(); result != nil { t.Fatalf("empty cluster allocate returned allocation: %s", result) } @@ -2294,9 +2271,7 @@ func TestTryAllocateLarge(t *testing.T) { func TestAllocReserveNewNode(t *testing.T) { setupUGM() partition := createQueuesNodes(t) - if partition == nil { - t.Fatal("partition create failed") - } + assert.Assert(t, partition != nil, "partition create failed") if result := partition.tryAllocate(); result != nil { t.Fatalf("empty cluster allocate returned result: %s", result) } @@ -2365,9 +2340,7 @@ func TestAllocReserveNewNode(t *testing.T) { func TestTryAllocateReserve(t *testing.T) { setupUGM() partition := createQueuesNodes(t) - if partition == nil { - t.Fatal("partition create failed") - } + assert.Assert(t, partition != nil, "partition create failed") if result := partition.tryReservedAllocate(); result != nil { t.Fatalf("empty cluster reserved allocate returned allocation: %s", result) } @@ -2439,9 +2412,7 @@ func TestTryAllocateReserve(t *testing.T) { func TestTryAllocateWithReserved(t *testing.T) { setupUGM() partition := createQueuesNodes(t) - if partition == nil { - t.Fatal("partition create failed") - } + assert.Assert(t, partition != nil, "partition create failed") if alloc := partition.tryReservedAllocate(); alloc != nil { t.Fatalf("empty cluster reserved allocate returned allocation: %v", alloc) } @@ -2463,9 +2434,7 @@ func TestTryAllocateWithReserved(t *testing.T) { // reserve one node: scheduling should happen on the other node2 := partition.GetNode(nodeID2) - if node2 == nil { - t.Fatal("expected node-2 to be returned got nil") - } + assert.Assert(t, node2 != nil, "expected node-2 to be returned got nil") partition.reserve(app, node2, ask) if app.NodeReservedForAsk(allocKey) != nodeID2 { t.Fatal("reservation failure for alloc-1 and node-2") @@ -2494,9 +2463,7 @@ func TestTryAllocateWithReserved(t *testing.T) { func TestScheduleRemoveReservedAsk(t *testing.T) { setupUGM() partition := createQueuesNodes(t) - if partition == nil { - t.Fatal("partition create failed") - } + assert.Assert(t, partition != nil, "partition create failed") if result := partition.tryAllocate(); result != nil { t.Fatalf("empty cluster allocate returned allocation: %s", result) } @@ -2584,9 +2551,7 @@ func TestScheduleRemoveReservedAsk(t *testing.T) { // update the config with nodes registered and make sure that the root max and guaranteed are not changed func TestUpdateRootQueue(t *testing.T) { partition := createQueuesNodes(t) - if partition == nil { - t.Fatal("partition create failed") - } + assert.Assert(t, partition != nil, "partition create failed") res, err := resources.NewResourceFromConf(map[string]string{"vcore": "20"}) assert.NilError(t, err, "resource creation failed") assert.Assert(t, resources.Equals(res, partition.totalPartitionResource), "partition resource not set as expected") @@ -3890,9 +3855,7 @@ func TestGetNodeSortingPolicyWhenNewPartitionFromConfig(t *testing.T) { func TestTryAllocateMaxRunning(t *testing.T) { const resType = "vcore" partition := createQueuesNodes(t) - if partition == nil { - t.Fatal("partition create failed") - } + assert.Assert(t, partition != nil, "partition create failed") if result := partition.tryAllocate(); result != nil { t.Fatalf("empty cluster allocate returned allocation: %s", result) } diff --git a/pkg/scheduler/tests/recovery_test.go b/pkg/scheduler/tests/recovery_test.go index eeb9b8f4..c7d21912 100644 --- a/pkg/scheduler/tests/recovery_test.go +++ b/pkg/scheduler/tests/recovery_test.go @@ -675,9 +675,7 @@ func TestAppRecovery(t *testing.T) { mockRM.waitForAcceptedNode(t, "node-2:1234", 1000) app := serviceContext.Scheduler.GetClusterContext().GetApplication(appID1, "[rm:123]default") - if app == nil { - t.Fatal("application not found after recovery") - } + assert.Assert(t, app != nil, "application not found after recovery") assert.Equal(t, app.ApplicationID, appID1) assert.Equal(t, app.GetQueuePath(), "root.a") } --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@yunikorn.apache.org For additional commands, e-mail: issues-h...@yunikorn.apache.org