This is an automated email from the ASF dual-hosted git repository.
ccondit pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/branch-1.6 by this push:
new 37c1ee86 [YUNIKORN-3036] fix race prevention regression (#1014)
37c1ee86 is described below
commit 37c1ee86264d4b298fbb641589557291eabb74a0
Author: Wilfred Spiegelenburg <[email protected]>
AuthorDate: Mon Mar 3 11:26:58 2025 -0600
[YUNIKORN-3036] fix race prevention regression (#1014)
Re-instate the change from YUNIKORN-1993 that was reversed as part of
YUNIKORN-2658. Add more explicit comment to not move the code.
Minor cleanup in function naming and fix the function length of
removeAllocation (lint fix)
Closes: #1014
Signed-off-by: Craig Condit <[email protected]>
(cherry picked from commit 9eee423b3df00cc9dbe98ae05888ddf525753a07)
---
pkg/scheduler/partition.go | 54 +++++++++++++++------------
pkg/scheduler/partition_test.go | 71 +++++++++---------------------------
pkg/scheduler/tests/recovery_test.go | 4 +-
3 files changed, 48 insertions(+), 81 deletions(-)
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index 47c0d30c..b4b6e326 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -1354,7 +1354,9 @@ func (pc *PartitionContext) calculateNodesResourceUsage()
map[string][]int {
return mapResult
}
-func (pc *PartitionContext) generateReleased(release *si.AllocationRelease,
app *objects.Application) []*objects.Allocation {
+// processAllocationRelease processes the releases from the RM and removes the
allocation(s) as requested.
+// Updates the application which can trigger an application state change.
+func (pc *PartitionContext) processAllocationRelease(release
*si.AllocationRelease, app *objects.Application) []*objects.Allocation {
released := make([]*objects.Allocation, 0)
// when allocationKey is not specified, remove all allocations from the
app
allocationKey := release.GetAllocationKey()
@@ -1386,7 +1388,7 @@ func (pc *PartitionContext) generateReleased(release
*si.AllocationRelease, app
// removeAllocation removes the referenced allocation(s) from the applications
and nodes
// NOTE: this is a lock free call. It must NOT be called holding the
PartitionContext lock.
-func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease)
([]*objects.Allocation, *objects.Allocation) { //nolint:funlen
+func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease)
([]*objects.Allocation, *objects.Allocation) {
if release == nil {
return nil, nil
}
@@ -1402,25 +1404,20 @@ func (pc *PartitionContext) removeAllocation(release
*si.AllocationRelease) ([]*
return nil, nil
}
- // temp store for allocations manipulated
- released := pc.generateReleased(release, app)
- var confirmed *objects.Allocation
+ // **** DO NOT MOVE **** this must be called before any allocations are
released.
+ // Processing a removal while in the Completing state could race with
the state change. The race occurs between
+ // removing the allocation and updating the queue after node
processing. If the state change removes the queue link
+ // before we get to updating the queue after the node we leave the
resources as allocated on the queue. The queue
+ // will always exist at this point. Retrieving the queue now sidesteps
this.
+ queue := app.GetQueue()
- // all releases are collected: placeholder count needs updating for all
placeholder releases
- // regardless of what happens later
- phReleases := 0
- for _, r := range released {
- if r.IsPlaceholder() {
- phReleases++
- }
- }
- if phReleases > 0 {
- pc.decPhAllocationCount(phReleases)
- }
+ released := pc.processAllocationRelease(release, app)
+ pc.updatePhAllocationCount(released)
- // for each allocation to release, update the node and queue.
total := resources.NewResource()
totalPreempting := resources.NewResource()
+ var confirmed *objects.Allocation
+ // for each allocation to release, update the node and queue.
for _, alloc := range released {
node := pc.GetNode(alloc.GetNodeID())
if node == nil {
@@ -1471,13 +1468,6 @@ func (pc *PartitionContext) removeAllocation(release
*si.AllocationRelease) ([]*
}
}
- // Processing a removal while in the Completing state could race with
the state change.
- // The race occurs between removing the allocation and updating the
queue after node processing.
- // If the state change removes the queue link before we get to updating
the queue after the node we
- // leave the resources as allocated on the queue. The queue cannot be
removed yet at this point as
- // there are still allocations left. So retrieve the queue early to
sidestep the race.
- queue := app.GetQueue()
-
if resources.StrictlyGreaterThanZero(total) {
if err := queue.DecAllocatedResource(total); err != nil {
log.Log(log.SchedPartition).Warn("failed to release
resources from queue",
@@ -1515,6 +1505,22 @@ func (pc *PartitionContext) removeAllocation(release
*si.AllocationRelease) ([]*
return released, confirmed
}
+// updatePhAllocationCount checks the released allocations and updates the
partition context counter of allocated
+// placeholders.
+func (pc *PartitionContext) updatePhAllocationCount(released
[]*objects.Allocation) {
+ // all releases are collected: placeholder count needs updating for all
placeholder releases
+ // regardless of what happens later
+ phReleases := 0
+ for _, a := range released {
+ if a.IsPlaceholder() {
+ phReleases++
+ }
+ }
+ if phReleases > 0 {
+ pc.decPhAllocationCount(phReleases)
+ }
+}
+
func (pc *PartitionContext) GetCurrentState() string {
return pc.stateMachine.Current()
}
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index c6351e0a..8e88bd52 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -1499,9 +1499,7 @@ func TestGetQueue(t *testing.T) {
func TestTryAllocate(t *testing.T) {
setupUGM()
partition := createQueuesNodes(t)
- if partition == nil {
- t.Fatal("partition create failed")
- }
+ assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s",
result)
}
@@ -1578,12 +1576,7 @@ func TestTryAllocate(t *testing.T) {
func TestRequiredNodeReservation(t *testing.T) {
setupUGM()
partition := createQueuesNodes(t)
- if partition == nil {
- t.Fatal("partition create failed")
- }
- if result := partition.tryAllocate(); result != nil {
- t.Fatalf("empty cluster allocate returned allocation: %s",
result)
- }
+ assert.Assert(t, partition != nil, "partition create failed")
node := partition.nodes.GetNode(nodeID1)
if node == nil {
t.Fatal("node-1 should have been created")
@@ -1664,9 +1657,7 @@ func TestRequiredNodeReservation(t *testing.T) {
// allocate ask request with required node having non daemon set reservations
func TestRequiredNodeCancelOtherReservations(t *testing.T) {
partition := createQueuesNodes(t)
- if partition == nil {
- t.Fatal("partition create failed")
- }
+ assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s",
result)
}
@@ -1744,9 +1735,7 @@ func TestRequiredNodeCancelOtherReservations(t
*testing.T) {
// allocate ask request with required node having daemon set reservations
func TestRequiredNodeCancelDSReservations(t *testing.T) {
partition := createQueuesNodes(t)
- if partition == nil {
- t.Fatal("partition create failed")
- }
+ assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s",
result)
}
@@ -1829,9 +1818,7 @@ func TestRequiredNodeCancelDSReservations(t *testing.T) {
func TestRequiredNodeNotExist(t *testing.T) {
setupUGM()
partition := createQueuesNodes(t)
- if partition == nil {
- t.Fatal("partition create failed")
- }
+ assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s",
result)
}
@@ -1866,9 +1853,7 @@ func TestRequiredNodeNotExist(t *testing.T) {
// basic ds scheduling on specific node in first allocate run itself (without
any need for reservation)
func TestRequiredNodeAllocation(t *testing.T) {
partition := createQueuesNodes(t)
- if partition == nil {
- t.Fatal("partition create failed")
- }
+ assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s",
result.Request.String())
}
@@ -2037,9 +2022,7 @@ func TestPreemptionForRequiredNodeReservedAlloc(t
*testing.T) {
func TestPreemptionForRequiredNodeMultipleAttemptsAvoided(t *testing.T) {
partition := createQueuesNodes(t)
- if partition == nil {
- t.Fatal("partition create failed")
- }
+ assert.Assert(t, partition != nil, "partition create failed")
app, testHandler := newApplicationWithHandler(appID1, "default",
"root.parent.sub-leaf")
res, err := resources.NewResourceFromConf(map[string]string{"vcore":
"8"})
@@ -2119,9 +2102,7 @@ func
getExpectedQueuesLimitsForPreemptionWithRequiredNode() map[string]map[strin
// setup the partition with existing allocations so we can test preemption
func setupPreemption(t *testing.T) (*PartitionContext, *objects.Application,
*objects.Application, *objects.Allocation, *objects.Allocation) {
partition := createPreemptionQueuesNodes(t)
- if partition == nil {
- t.Fatal("partition create failed")
- }
+ assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s",
result)
}
@@ -2181,9 +2162,7 @@ func setupPreemption(t *testing.T) (*PartitionContext,
*objects.Application, *ob
// setup the partition in a state that we need for multiple tests
func setupPreemptionForRequiredNode(t *testing.T) (*PartitionContext,
*objects.Application) {
partition := createQueuesNodes(t)
- if partition == nil {
- t.Fatal("partition create failed")
- }
+ assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s",
result)
}
@@ -2261,9 +2240,7 @@ func setupPreemptionForRequiredNode(t *testing.T)
(*PartitionContext, *objects.A
func TestTryAllocateLarge(t *testing.T) {
setupUGM()
partition := createQueuesNodes(t)
- if partition == nil {
- t.Fatal("partition create failed")
- }
+ assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s",
result)
}
@@ -2294,9 +2271,7 @@ func TestTryAllocateLarge(t *testing.T) {
func TestAllocReserveNewNode(t *testing.T) {
setupUGM()
partition := createQueuesNodes(t)
- if partition == nil {
- t.Fatal("partition create failed")
- }
+ assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned result: %s", result)
}
@@ -2365,9 +2340,7 @@ func TestAllocReserveNewNode(t *testing.T) {
func TestTryAllocateReserve(t *testing.T) {
setupUGM()
partition := createQueuesNodes(t)
- if partition == nil {
- t.Fatal("partition create failed")
- }
+ assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryReservedAllocate(); result != nil {
t.Fatalf("empty cluster reserved allocate returned allocation:
%s", result)
}
@@ -2439,9 +2412,7 @@ func TestTryAllocateReserve(t *testing.T) {
func TestTryAllocateWithReserved(t *testing.T) {
setupUGM()
partition := createQueuesNodes(t)
- if partition == nil {
- t.Fatal("partition create failed")
- }
+ assert.Assert(t, partition != nil, "partition create failed")
if alloc := partition.tryReservedAllocate(); alloc != nil {
t.Fatalf("empty cluster reserved allocate returned allocation:
%v", alloc)
}
@@ -2463,9 +2434,7 @@ func TestTryAllocateWithReserved(t *testing.T) {
// reserve one node: scheduling should happen on the other
node2 := partition.GetNode(nodeID2)
- if node2 == nil {
- t.Fatal("expected node-2 to be returned got nil")
- }
+ assert.Assert(t, node2 != nil, "expected node-2 to be returned got nil")
partition.reserve(app, node2, ask)
if app.NodeReservedForAsk(allocKey) != nodeID2 {
t.Fatal("reservation failure for alloc-1 and node-2")
@@ -2494,9 +2463,7 @@ func TestTryAllocateWithReserved(t *testing.T) {
func TestScheduleRemoveReservedAsk(t *testing.T) {
setupUGM()
partition := createQueuesNodes(t)
- if partition == nil {
- t.Fatal("partition create failed")
- }
+ assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s",
result)
}
@@ -2584,9 +2551,7 @@ func TestScheduleRemoveReservedAsk(t *testing.T) {
// update the config with nodes registered and make sure that the root max and
guaranteed are not changed
func TestUpdateRootQueue(t *testing.T) {
partition := createQueuesNodes(t)
- if partition == nil {
- t.Fatal("partition create failed")
- }
+ assert.Assert(t, partition != nil, "partition create failed")
res, err := resources.NewResourceFromConf(map[string]string{"vcore":
"20"})
assert.NilError(t, err, "resource creation failed")
assert.Assert(t, resources.Equals(res,
partition.totalPartitionResource), "partition resource not set as expected")
@@ -3890,9 +3855,7 @@ func TestGetNodeSortingPolicyWhenNewPartitionFromConfig(t
*testing.T) {
func TestTryAllocateMaxRunning(t *testing.T) {
const resType = "vcore"
partition := createQueuesNodes(t)
- if partition == nil {
- t.Fatal("partition create failed")
- }
+ assert.Assert(t, partition != nil, "partition create failed")
if result := partition.tryAllocate(); result != nil {
t.Fatalf("empty cluster allocate returned allocation: %s",
result)
}
diff --git a/pkg/scheduler/tests/recovery_test.go
b/pkg/scheduler/tests/recovery_test.go
index eeb9b8f4..c7d21912 100644
--- a/pkg/scheduler/tests/recovery_test.go
+++ b/pkg/scheduler/tests/recovery_test.go
@@ -675,9 +675,7 @@ func TestAppRecovery(t *testing.T) {
mockRM.waitForAcceptedNode(t, "node-2:1234", 1000)
app :=
serviceContext.Scheduler.GetClusterContext().GetApplication(appID1,
"[rm:123]default")
- if app == nil {
- t.Fatal("application not found after recovery")
- }
+ assert.Assert(t, app != nil, "application not found after recovery")
assert.Equal(t, app.ApplicationID, appID1)
assert.Equal(t, app.GetQueuePath(), "root.a")
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]