This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new fde27e55 [YUNIKORN-2057] Optimize FindQueueByAppID (#1037)
fde27e55 is described below
commit fde27e5573413a1d6d10f3ef89b3122a30765f8a
Author: Adrian Lin <[email protected]>
AuthorDate: Mon Oct 27 16:15:26 2025 +0100
[YUNIKORN-2057] Optimize FindQueueByAppID (#1037)
Closes: #1037
Signed-off-by: Peter Bacsko <[email protected]>
---
pkg/scheduler/objects/app_queue_mapping.go | 51 ++++
pkg/scheduler/objects/app_queue_mapping_test.go | 48 +++
pkg/scheduler/objects/application.go | 2 +-
pkg/scheduler/objects/application_test.go | 44 +--
pkg/scheduler/objects/preemption.go | 2 +-
pkg/scheduler/objects/preemption_test.go | 374 ++++++++++++++----------
pkg/scheduler/objects/queue.go | 45 +--
pkg/scheduler/objects/queue_test.go | 74 ++---
pkg/scheduler/objects/utilities_test.go | 24 +-
pkg/scheduler/partition.go | 15 +-
pkg/scheduler/partition_test.go | 2 +-
pkg/scheduler/placement/testrule_test.go | 4 +-
12 files changed, 420 insertions(+), 265 deletions(-)
diff --git a/pkg/scheduler/objects/app_queue_mapping.go
b/pkg/scheduler/objects/app_queue_mapping.go
new file mode 100644
index 00000000..0dac1edc
--- /dev/null
+++ b/pkg/scheduler/objects/app_queue_mapping.go
@@ -0,0 +1,51 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package objects
+
+import "github.com/apache/yunikorn-core/pkg/locking"
+
+// AppQueueMapping maintains a mapping between application IDs and their
corresponding queues.
+type AppQueueMapping struct {
+ byAppID map[string]*Queue
+ locking.RWMutex
+}
+
+func NewAppQueueMapping() *AppQueueMapping {
+ return &AppQueueMapping{
+ byAppID: make(map[string]*Queue),
+ }
+}
+
+func (aqm *AppQueueMapping) AddAppQueueMapping(appID string, queue *Queue) {
+ aqm.Lock()
+ defer aqm.Unlock()
+ aqm.byAppID[appID] = queue
+}
+
+func (aqm *AppQueueMapping) GetQueueByAppId(appID string) *Queue {
+ aqm.RLock()
+ defer aqm.RUnlock()
+ return aqm.byAppID[appID]
+}
+
+func (aqm *AppQueueMapping) RemoveAppQueueMapping(appID string) {
+ aqm.Lock()
+ defer aqm.Unlock()
+ delete(aqm.byAppID, appID)
+}
diff --git a/pkg/scheduler/objects/app_queue_mapping_test.go
b/pkg/scheduler/objects/app_queue_mapping_test.go
new file mode 100644
index 00000000..56d9b44a
--- /dev/null
+++ b/pkg/scheduler/objects/app_queue_mapping_test.go
@@ -0,0 +1,48 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package objects
+
+import (
+ "testing"
+
+ "gotest.tools/v3/assert"
+)
+
+func TestNewAppQueueMapping(t *testing.T) {
+ aqm := NewAppQueueMapping()
+ assert.Assert(t, aqm != nil, "expected non-nil AppQueueMapping")
+ assert.Equal(t, 0, len(aqm.byAppID), "expected empty byAppID map")
+}
+func TestAppQueueMappingOperations(t *testing.T) {
+ aqm := NewAppQueueMapping()
+ queue := &Queue{}
+ appID := "app-1234"
+
+ // Test AddAppQueueMapping
+ aqm.AddAppQueueMapping(appID, queue)
+ assert.Equal(t, 1, len(aqm.byAppID), "expected 1 entry in byAppID map")
+
+ // Test FindQueueByAppID
+ foundQueue := aqm.GetQueueByAppId(appID)
+ assert.Equal(t, foundQueue, queue, "expected to find the correct queue
for appID %s", appID)
+
+ // Test RemoveAppQueueMapping
+ aqm.RemoveAppQueueMapping(appID)
+ assert.Equal(t, 0, len(aqm.byAppID), "expected 0 entries in byAppID map
after removal")
+}
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index db7929f9..62d7305d 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -1395,7 +1395,7 @@ func (sa *Application) tryRequiredNodePreemption(reserve
*reservation, ask *Allo
zap.String("allocation name", ask.GetAllocationName()),
zap.Int("no.of victims", len(victims)))
for _, victim := range victims {
- if victimQueue :=
sa.queue.FindQueueByAppID(victim.GetApplicationID()); victimQueue != nil {
+ if victimQueue :=
sa.queue.GetQueueByAppID(victim.GetApplicationID()); victimQueue != nil {
victimQueue.IncPreemptingResource(victim.GetAllocatedResource())
}
victim.MarkPreempted()
diff --git a/pkg/scheduler/objects/application_test.go
b/pkg/scheduler/objects/application_test.go
index 20715e0c..d7241ea0 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -1040,7 +1040,7 @@ func TestUpdateAllocationResourcePending(t *testing.T) {
app := newApplication(appID1, "default", "root.a")
root, err := createRootQueue(nil)
assert.NilError(t, err, "failed to create root queue")
- queue, err := createDynamicQueue(root, "test", false)
+ queue, err := createDynamicQueue(root, "test", false, nil)
assert.NilError(t, err, "failed to create test queue")
app.SetQueue(queue)
@@ -1088,7 +1088,7 @@ func TestUpdateAllocationResourceAllocated(t *testing.T) {
app := newApplication(appID1, "default", "root.a")
root, err := createRootQueue(nil)
assert.NilError(t, err, "failed to create root queue")
- queue, err := createDynamicQueue(root, "test", false)
+ queue, err := createDynamicQueue(root, "test", false, nil)
assert.NilError(t, err, "failed to create test queue")
app.SetQueue(queue)
@@ -1138,7 +1138,7 @@ func TestQueueUpdate(t *testing.T) {
root, err := createRootQueue(nil)
assert.NilError(t, err, "failed to create root queue")
- queue, err := createDynamicQueue(root, "test", false)
+ queue, err := createDynamicQueue(root, "test", false, nil)
assert.NilError(t, err, "failed to create test queue")
app.SetQueue(queue)
assert.Equal(t, app.GetQueuePath(), "root.test")
@@ -2195,19 +2195,21 @@ func TestTryAllocatePreemptQueue(t *testing.T) {
getNode := func(nodeID string) *Node {
return nodeMap[nodeID]
}
+ appQueueMapping := NewAppQueueMapping()
rootQ, err := createRootQueue(map[string]string{"first": "20"})
assert.NilError(t, err)
- parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"first": "20"}, map[string]string{"first": "10"})
+ parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"first": "20"}, map[string]string{"first": "10"},
appQueueMapping)
assert.NilError(t, err)
- childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false,
nil, map[string]string{"first": "5"})
+ childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false,
nil, map[string]string{"first": "5"}, appQueueMapping)
assert.NilError(t, err)
- childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false,
nil, map[string]string{"first": "5"})
+ childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false,
nil, map[string]string{"first": "5"}, appQueueMapping)
assert.NilError(t, err)
app1 := newApplication(appID1, "default", "root.parent.child1")
app1.SetQueue(childQ1)
- childQ1.applications[appID1] = app1
+ childQ1.AddApplication(app1)
+ appQueueMapping.AddAppQueueMapping(appID1, childQ1)
ask1 := newAllocationAsk("alloc1", appID1,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
err = app1.AddAllocationAsk(ask1)
assert.NilError(t, err)
@@ -2217,7 +2219,8 @@ func TestTryAllocatePreemptQueue(t *testing.T) {
app2 := newApplication(appID2, "default", "root.parent.child2")
app2.SetQueue(childQ2)
- childQ2.applications[appID2] = app2
+ childQ2.AddApplication(app2)
+ appQueueMapping.AddAppQueueMapping(appID2, childQ2)
ask3 := newAllocationAsk("alloc3", appID2,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
ask3.allowPreemptOther = true
err = app2.AddAllocationAsk(ask3)
@@ -2276,20 +2279,23 @@ func createPreemptNodeTestSetup(t *testing.T) (func()
NodeIterator, func(NodeID
return nodeMap[nodeID]
}
+ appQueueMapping := NewAppQueueMapping()
+
rootQ, err := createRootQueue(map[string]string{"first": "40"})
assert.NilError(t, err)
- parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"first": "20"}, map[string]string{"first": "10"})
+ parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"first": "20"}, map[string]string{"first": "10"},
appQueueMapping)
assert.NilError(t, err)
- unlimitedQ, err := createManagedQueueGuaranteed(rootQ, "unlimited",
false, nil, nil)
+ unlimitedQ, err := createManagedQueueGuaranteed(rootQ, "unlimited",
false, nil, nil, appQueueMapping)
assert.NilError(t, err)
- childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false,
nil, map[string]string{"first": "5"})
+ childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false,
nil, map[string]string{"first": "5"}, appQueueMapping)
assert.NilError(t, err)
- childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false,
nil, map[string]string{"first": "5"})
+ childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false,
nil, map[string]string{"first": "5"}, appQueueMapping)
assert.NilError(t, err)
app0 := newApplication(appID0, "default", "root.unlimited")
app0.SetQueue(unlimitedQ)
- unlimitedQ.applications[appID0] = app0
+ unlimitedQ.AddApplication(app0)
+ appQueueMapping.AddAppQueueMapping(appID0, unlimitedQ)
ask00 := newAllocationAsk("alloc0-0", appID0,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 11}))
err = app0.AddAllocationAsk(ask00)
assert.NilError(t, err)
@@ -2299,7 +2305,8 @@ func createPreemptNodeTestSetup(t *testing.T) (func()
NodeIterator, func(NodeID
app1 := newApplication(appID1, "default", "root.parent.child1")
app1.SetQueue(childQ1)
- childQ1.applications[appID1] = app1
+ childQ1.AddApplication(app1)
+ appQueueMapping.AddAppQueueMapping(appID1, childQ1)
ask1 := newAllocationAsk("alloc1", appID1,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
err = app1.AddAllocationAsk(ask1)
assert.NilError(t, err)
@@ -2309,7 +2316,8 @@ func createPreemptNodeTestSetup(t *testing.T) (func()
NodeIterator, func(NodeID
app2 := newApplication(appID2, "default", "root.parent.child2")
app2.SetQueue(childQ2)
- childQ2.applications[appID2] = app2
+ childQ2.AddApplication(app2)
+ appQueueMapping.AddAppQueueMapping(appID2, childQ2)
ask3 := newAllocationAsk("alloc3", appID2,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
ask3.allowPreemptOther = true
err = app2.AddAllocationAsk(ask3)
@@ -2358,7 +2366,7 @@ func createPreemptNodeWithReservationsTestSetup(t
*testing.T) (func() NodeIterat
app3 := newApplication(appID3, "default", "root.parent.child2")
app3.SetQueue(childQ2)
- childQ2.applications[appID3] = app3
+ childQ2.AddApplication(app3)
ask4 := newAllocationAsk("alloc4", appID3,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
ask4.allowPreemptOther = true
ask4.priority = math.MaxInt32
@@ -3257,13 +3265,15 @@ func TestRequiredNodePreemption(t *testing.T) {
getNode := func(nodeID string) *Node {
return node
}
+ appQueueMapping := NewAppQueueMapping()
// set queue
rootQ, err := createRootQueue(map[string]string{"first": "20"})
assert.NilError(t, err)
- childQ, err := createManagedQueue(rootQ, "default", false,
map[string]string{"first": "20"})
+ childQ, err := createManagedQueueWithAppQueueMapping(rootQ, "default",
false, map[string]string{"first": "20"}, appQueueMapping)
assert.NilError(t, err)
app.SetQueue(childQ)
+ appQueueMapping.AddAppQueueMapping(app.ApplicationID, childQ)
// add an ask
mockEvents := mock.NewEventSystem()
diff --git a/pkg/scheduler/objects/preemption.go
b/pkg/scheduler/objects/preemption.go
index 8751000f..555b9cea 100644
--- a/pkg/scheduler/objects/preemption.go
+++ b/pkg/scheduler/objects/preemption.go
@@ -649,7 +649,7 @@ func (p *Preemptor) TryPreemption() (*AllocationResult,
bool) {
// preempt the victims
for _, victim := range finalVictims {
- if victimQueue :=
p.queue.FindQueueByAppID(victim.GetApplicationID()); victimQueue != nil {
+ if victimQueue :=
p.queue.GetQueueByAppID(victim.GetApplicationID()); victimQueue != nil {
victimQueue.IncPreemptingResource(victim.GetAllocatedResource())
victim.MarkPreempted()
log.Log(log.SchedPreemption).Info("Preempting task",
diff --git a/pkg/scheduler/objects/preemption_test.go
b/pkg/scheduler/objects/preemption_test.go
index a88e1bdc..4cbf939a 100644
--- a/pkg/scheduler/objects/preemption_test.go
+++ b/pkg/scheduler/objects/preemption_test.go
@@ -42,10 +42,12 @@ func creatApp1(
node1 *Node,
node2 *Node,
app1Rec map[string]resources.Quantity,
+ appQueueMapping *AppQueueMapping,
) (*Allocation, *Allocation, error) {
app1 := newApplication(appID1, "default", "root.parent.child1")
app1.SetQueue(childQ1)
- childQ1.applications[appID1] = app1
+ childQ1.AddApplication(app1)
+ appQueueMapping.AddAppQueueMapping(app1.ApplicationID, childQ1)
ask1 := newAllocationAsk("alloc1", appID1,
resources.NewResourceFromMap(app1Rec))
ask1.createTime = time.Now().Add(-1 * time.Minute)
@@ -94,10 +96,12 @@ func creatApp2(
childQ2 *Queue,
app2Res map[string]resources.Quantity,
allocID string,
+ appQueueMapping *AppQueueMapping,
) (*Application, *Allocation, error) {
app2 := newApplication(appID2, "default", "root.parent.child2")
app2.SetQueue(childQ2)
- childQ2.applications[appID2] = app2
+ childQ2.AddApplication(app2)
+ appQueueMapping.AddAppQueueMapping(app2.ApplicationID, childQ2)
ask3 := newAllocationAsk(allocID, appID2,
resources.NewResourceFromMap(app2Res))
if err := app2.AddAllocationAsk(ask3); err != nil {
return nil, nil, err
@@ -115,7 +119,7 @@ func TestCheckPreconditions(t *testing.T) {
assert.NilError(t, err)
app := newApplication(appID1, "default", "root.child")
app.SetQueue(childQ)
- childQ.applications[appID1] = app
+ childQ.AddApplication(app)
ask := newAllocationAsk("alloc1", appID1,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
ask.allowPreemptOther = true
ask.createTime = time.Now().Add(-1 * time.Minute)
@@ -171,23 +175,24 @@ func TestCheckPreconditions(t *testing.T) {
}
func TestCheckPreemptionQueueGuarantees(t *testing.T) {
+ appQueueMapping := NewAppQueueMapping()
node := newNode("node1", map[string]resources.Quantity{"first": 20})
iterator := getNodeIteratorFn(node)
rootQ, err := createRootQueue(map[string]string{"first": "20"})
assert.NilError(t, err)
- parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"first": "20"}, map[string]string{"first": "10"})
+ parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"first": "20"}, map[string]string{"first": "10"},
appQueueMapping)
assert.NilError(t, err)
- childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false,
map[string]string{"first": "10"}, map[string]string{"first": "5"})
+ childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false,
map[string]string{"first": "10"}, map[string]string{"first": "5"},
appQueueMapping)
assert.NilError(t, err)
- childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false,
map[string]string{"first": "10"}, map[string]string{"first": "5"})
+ childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false,
map[string]string{"first": "10"}, map[string]string{"first": "5"},
appQueueMapping)
assert.NilError(t, err)
- alloc1, alloc2, err := creatApp1(childQ1, node, nil,
map[string]resources.Quantity{"first": 5})
+ alloc1, alloc2, err := creatApp1(childQ1, node, nil,
map[string]resources.Quantity{"first": 5}, appQueueMapping)
assert.NilError(t, err)
assert.Assert(t, alloc1 != nil, "alloc1 should not be nil")
assert.Assert(t, alloc2 != nil, "alloc2 should not be nil")
- app2, ask3, err := creatApp2(childQ2,
map[string]resources.Quantity{"first": 5}, "alloc3")
+ app2, ask3, err := creatApp2(childQ2,
map[string]resources.Quantity{"first": 5}, "alloc3", appQueueMapping)
assert.NilError(t, err)
childQ2.incPendingResource(ask3.GetAllocatedResource())
@@ -219,23 +224,24 @@ func
TestCheckPreemptionQueueGuaranteesWithNoGuaranteedResources(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.testName, func(t *testing.T) {
+ appQueueMapping := NewAppQueueMapping()
node := newNode("node1",
map[string]resources.Quantity{"first": 20})
iterator := getNodeIteratorFn(node)
rootQ, err :=
createRootQueue(map[string]string{"first": "20"})
assert.NilError(t, err)
- parentQ, err := createManagedQueueGuaranteed(rootQ,
"parent", true, map[string]string{}, tt.parentGuaranteed)
+ parentQ, err := createManagedQueueGuaranteed(rootQ,
"parent", true, map[string]string{}, tt.parentGuaranteed, appQueueMapping)
assert.NilError(t, err)
- childQ1, err := createManagedQueueGuaranteed(parentQ,
"child1", false, map[string]string{}, map[string]string{"first": "5"})
+ childQ1, err := createManagedQueueGuaranteed(parentQ,
"child1", false, map[string]string{}, map[string]string{"first": "5"},
appQueueMapping)
assert.NilError(t, err)
- childQ2, err := createManagedQueueGuaranteed(parentQ,
"child2", false, map[string]string{}, tt.childGuaranteed)
+ childQ2, err := createManagedQueueGuaranteed(parentQ,
"child2", false, map[string]string{}, tt.childGuaranteed, appQueueMapping)
assert.NilError(t, err)
- alloc1, alloc2, err := creatApp1(childQ1, node, nil,
map[string]resources.Quantity{"first": 5})
+ alloc1, alloc2, err := creatApp1(childQ1, node, nil,
map[string]resources.Quantity{"first": 5}, appQueueMapping)
assert.NilError(t, err)
assert.Assert(t, alloc1 != nil, "alloc1 should not be
nil")
assert.Assert(t, alloc2 != nil, "alloc2 should not be
nil")
- app2, ask3, err := creatApp2(childQ2,
map[string]resources.Quantity{"first": 5}, "alloc3")
+ app2, ask3, err := creatApp2(childQ2,
map[string]resources.Quantity{"first": 5}, "alloc3", appQueueMapping)
assert.NilError(t, err)
childQ2.incPendingResource(ask3.GetAllocatedResource())
@@ -256,21 +262,22 @@ func
TestCheckPreemptionQueueGuaranteesWithNoGuaranteedResources(t *testing.T) {
}
func TestTryPreemption(t *testing.T) {
+ appQueueMapping := NewAppQueueMapping()
node := newNode(nodeID1, map[string]resources.Quantity{"first": 10,
"pods": 5})
iterator := getNodeIteratorFn(node)
rootQ, err := createRootQueue(map[string]string{"first": "20", "pods":
"5"})
assert.NilError(t, err)
- parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"first": "20"}, map[string]string{"first": "10"})
+ parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"first": "20"}, map[string]string{"first": "10"},
appQueueMapping)
assert.NilError(t, err)
- childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false,
map[string]string{"first": "10"}, map[string]string{"first": "5"})
+ childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false,
map[string]string{"first": "10"}, map[string]string{"first": "5"},
appQueueMapping)
assert.NilError(t, err)
- childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false,
map[string]string{"first": "10"}, map[string]string{"first": "5"})
+ childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false,
map[string]string{"first": "10"}, map[string]string{"first": "5"},
appQueueMapping)
assert.NilError(t, err)
- alloc1, alloc2, err := creatApp1(childQ1, node, nil,
map[string]resources.Quantity{"first": 5, "pods": 1})
+ alloc1, alloc2, err := creatApp1(childQ1, node, nil,
map[string]resources.Quantity{"first": 5, "pods": 1}, appQueueMapping)
assert.NilError(t, err)
- app2, ask3, err := creatApp2(childQ2,
map[string]resources.Quantity{"first": 5, "pods": 1}, "alloc3")
+ app2, ask3, err := creatApp2(childQ2,
map[string]resources.Quantity{"first": 5, "pods": 1}, "alloc3", appQueueMapping)
assert.NilError(t, err)
childQ2.incPendingResource(ask3.GetAllocatedResource())
@@ -296,25 +303,26 @@ func TestTryPreemption(t *testing.T) {
}
func TestTryPreemption_SendEvent(t *testing.T) {
+ appQueueMapping := NewAppQueueMapping()
node := newNode(nodeID1, map[string]resources.Quantity{"first": 10,
"pods": 5})
iterator := getNodeIteratorFn(node)
rootQ, err := createRootQueue(map[string]string{"first": "20", "pods":
"5"})
assert.NilError(t, err)
- parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"first": "20"}, map[string]string{"first": "10"})
+ parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"first": "20"}, map[string]string{"first": "10"},
appQueueMapping)
assert.NilError(t, err)
- childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false,
map[string]string{"first": "10"}, map[string]string{"first": "5"})
+ childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false,
map[string]string{"first": "10"}, map[string]string{"first": "5"},
appQueueMapping)
assert.NilError(t, err)
- childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false,
map[string]string{"first": "10"}, map[string]string{"first": "5"})
+ childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false,
map[string]string{"first": "10"}, map[string]string{"first": "5"},
appQueueMapping)
assert.NilError(t, err)
- alloc1, alloc2, err := creatApp1(childQ1, node, nil,
map[string]resources.Quantity{"first": 5, "pods": 1})
+ alloc1, alloc2, err := creatApp1(childQ1, node, nil,
map[string]resources.Quantity{"first": 5, "pods": 1}, appQueueMapping)
assert.NilError(t, err)
eventSystem := evtMock.NewEventSystem()
events := schedEvt.NewAskEvents(eventSystem)
alloc1.askEvents = events
- app2, ask3, err := creatApp2(childQ2,
map[string]resources.Quantity{"first": 5, "pods": 1}, "alloc3")
+ app2, ask3, err := creatApp2(childQ2,
map[string]resources.Quantity{"first": 5, "pods": 1}, "alloc3", appQueueMapping)
assert.NilError(t, err)
childQ2.incPendingResource(ask3.GetAllocatedResource())
@@ -357,22 +365,23 @@ func TestTryPreemption_SendEvent(t *testing.T) {
// root.parent.child2. Guaranteed set, first: 5. Request of first:5 is waiting
for resources.
// 1 Allocation on root.parent.child1 should be preempted to free up resources
for ask arrived in root.parent.child2.
func TestTryPreemptionOnNode(t *testing.T) {
+ appQueueMapping := NewAppQueueMapping()
node1 := newNode(nodeID1, map[string]resources.Quantity{"first": 5,
"pods": 1})
node2 := newNode(nodeID2, map[string]resources.Quantity{"first": 5,
"pods": 1})
iterator := getNodeIteratorFn(node1, node2)
rootQ, err := createRootQueue(map[string]string{"first": "10", "pods":
"2"})
assert.NilError(t, err)
- parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"first": "20"}, map[string]string{"first": "10"})
+ parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"first": "20"}, map[string]string{"first": "10"},
appQueueMapping)
assert.NilError(t, err)
- childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false,
map[string]string{"first": "10"}, map[string]string{"first": "5"})
+ childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false,
map[string]string{"first": "10"}, map[string]string{"first": "5"},
appQueueMapping)
assert.NilError(t, err)
- childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false,
map[string]string{"first": "10"}, map[string]string{"first": "5"})
+ childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false,
map[string]string{"first": "10"}, map[string]string{"first": "5"},
appQueueMapping)
assert.NilError(t, err)
- alloc1, alloc2, err := creatApp1(childQ1, node1, node2,
map[string]resources.Quantity{"first": 5, "pods": 1})
+ alloc1, alloc2, err := creatApp1(childQ1, node1, node2,
map[string]resources.Quantity{"first": 5, "pods": 1}, appQueueMapping)
assert.NilError(t, err)
- app2, ask3, err := creatApp2(childQ2,
map[string]resources.Quantity{"first": 5, "pods": 1}, "alloc3")
+ app2, ask3, err := creatApp2(childQ2,
map[string]resources.Quantity{"first": 5, "pods": 1}, "alloc3", appQueueMapping)
assert.NilError(t, err)
headRoom :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "pods":
3})
@@ -406,22 +415,23 @@ func TestTryPreemptionOnNode(t *testing.T) {
// root.parent.child2. Guaranteed set, first: 6. Request of first:6 is waiting
for resources.
// Nome of the node would be considered for preemption as ask requirements is
higher than the node capacity. Hence, no results.
func TestTryPreemption_NodeWithCapacityLesserThanAsk(t *testing.T) {
+ appQueueMapping := NewAppQueueMapping()
node1 := newNode(nodeID1, map[string]resources.Quantity{"first": 5,
"pods": 1})
node2 := newNode(nodeID2, map[string]resources.Quantity{"first": 5,
"pods": 1})
iterator := getNodeIteratorFn(node1, node2)
rootQ, err := createRootQueue(map[string]string{"first": "10", "pods":
"2"})
assert.NilError(t, err)
- parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"first": "20"}, map[string]string{"first": "10"})
+ parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"first": "20"}, map[string]string{"first": "10"},
appQueueMapping)
assert.NilError(t, err)
- childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false,
map[string]string{"first": "10"}, map[string]string{"first": "5"})
+ childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false,
map[string]string{"first": "10"}, map[string]string{"first": "5"},
appQueueMapping)
assert.NilError(t, err)
- childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false,
map[string]string{"first": "10"}, map[string]string{"first": "6"})
+ childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false,
map[string]string{"first": "10"}, map[string]string{"first": "6"},
appQueueMapping)
assert.NilError(t, err)
- alloc1, alloc2, err := creatApp1(childQ1, node1, node2,
map[string]resources.Quantity{"first": 5, "pods": 1})
+ alloc1, alloc2, err := creatApp1(childQ1, node1, node2,
map[string]resources.Quantity{"first": 5, "pods": 1}, appQueueMapping)
assert.NilError(t, err)
- app2, ask3, err := creatApp2(childQ2,
map[string]resources.Quantity{"first": 6, "pods": 1}, "alloc3")
+ app2, ask3, err := creatApp2(childQ2,
map[string]resources.Quantity{"first": 6, "pods": 1}, "alloc3", appQueueMapping)
assert.NilError(t, err)
headRoom :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "pods":
3})
@@ -444,20 +454,22 @@ func TestTryPreemption_NodeWithCapacityLesserThanAsk(t
*testing.T) {
// root.parent.child2. Guaranteed set, first: 1. Ask of first:1 is waiting for
resources.
// 1 Allocation on root.parent.child1 should be preempted to free up resources
for ask arrived in root.parent.child2.
func TestTryPreemptionOnNodeWithOGParentAndUGPreemptor(t *testing.T) {
+ appQueueMapping := NewAppQueueMapping()
node1 := newNode(nodeID1, map[string]resources.Quantity{"first": 3,
"pods": 1})
node2 := newNode(nodeID2, map[string]resources.Quantity{"first": 3,
"pods": 1})
iterator := getNodeIteratorFn(node1, node2)
rootQ, err := createRootQueue(map[string]string{"first": "6", "pods":
"2"})
assert.NilError(t, err)
- parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"first": "20"}, map[string]string{"first": "2"})
+ parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"first": "20"}, map[string]string{"first": "2"},
appQueueMapping)
assert.NilError(t, err)
- childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false,
nil, nil)
+ childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false,
map[string]string{"first": "10"}, map[string]string{"first": "1"})
+ childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false,
map[string]string{"first": "10"}, map[string]string{"first": "1"},
appQueueMapping)
assert.NilError(t, err)
app1 := newApplication(appID1, "default", "root.parent.child1")
app1.SetQueue(childQ1)
- childQ1.applications[appID1] = app1
+ childQ1.AddApplication(app1)
+ appQueueMapping.AddAppQueueMapping(app1.ApplicationID, childQ1)
for i := 1; i <= 6; i++ {
ask1 := newAllocationAsk("alloc"+strconv.Itoa(i), appID1,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1}))
@@ -478,7 +490,7 @@ func TestTryPreemptionOnNodeWithOGParentAndUGPreemptor(t
*testing.T) {
}
}
- app2, ask3, err := creatApp2(childQ2,
map[string]resources.Quantity{"first": 1}, "alloc7")
+ app2, ask3, err := creatApp2(childQ2,
map[string]resources.Quantity{"first": 1}, "alloc7", appQueueMapping)
assert.NilError(t, err)
headRoom :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "pods":
3})
@@ -511,22 +523,23 @@ func TestTryPreemptionOnNodeWithOGParentAndUGPreemptor(t
*testing.T) {
// root.parent.child2. Guaranteed set, first: 5. Request of first:5 is waiting
for resources.
// 1 Allocation on root.parent.child1 should be preempted to free up resources
for ask arrived in root.parent.child2.
func TestTryPreemptionOnQueue(t *testing.T) {
+ appQueneMapping := NewAppQueueMapping()
node1 := newNode(nodeID1, map[string]resources.Quantity{"first": 10,
"pods": 2})
node2 := newNode(nodeID2, map[string]resources.Quantity{"first": 10,
"pods": 2})
iterator := getNodeIteratorFn(node1, node2)
rootQ, err := createRootQueue(map[string]string{"first": "20", "pods":
"4"})
assert.NilError(t, err)
- parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"first": "10"}, nil)
+ parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"first": "10"}, nil, appQueneMapping)
assert.NilError(t, err)
- childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false,
nil, map[string]string{"first": "5"})
+ childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false,
nil, map[string]string{"first": "5"}, appQueneMapping)
assert.NilError(t, err)
- childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false,
nil, map[string]string{"first": "5"})
+ childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false,
nil, map[string]string{"first": "5"}, appQueneMapping)
assert.NilError(t, err)
- alloc1, alloc2, err := creatApp1(childQ1, node1, node2,
map[string]resources.Quantity{"first": 5, "pods": 1})
+ alloc1, alloc2, err := creatApp1(childQ1, node1, node2,
map[string]resources.Quantity{"first": 5, "pods": 1}, appQueneMapping)
assert.NilError(t, err)
- app2, ask3, err := creatApp2(childQ2,
map[string]resources.Quantity{"first": 5, "pods": 1}, "alloc3")
+ app2, ask3, err := creatApp2(childQ2,
map[string]resources.Quantity{"first": 5, "pods": 1}, "alloc3", appQueneMapping)
assert.NilError(t, err)
headRoom :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "pods":
3})
@@ -559,22 +572,23 @@ func TestTryPreemptionOnQueue(t *testing.T) {
// root.parent.child2. Guaranteed set, first: 5. Request of first:5 is waiting
for resources.
// 2 Allocation on root.parent.child1 has been found and considered as
victims. Since victims total resource usage (first: 4) is lesser than ask
requirment (first: 5), preemption won't help. Hence, victims are dropped.
func TestTryPreemption_VictimsAvailable_InsufficientResource(t *testing.T) {
+ appQueueMapping := NewAppQueueMapping()
node1 := newNode(nodeID1, map[string]resources.Quantity{"first": 10,
"pods": 2})
node2 := newNode(nodeID2, map[string]resources.Quantity{"first": 10,
"pods": 2})
iterator := getNodeIteratorFn(node1, node2)
rootQ, err := createRootQueue(map[string]string{"first": "20", "pods":
"4"})
assert.NilError(t, err)
- parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"first": "8"}, nil)
+ parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"first": "8"}, nil, appQueueMapping)
assert.NilError(t, err)
- childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false,
nil, nil)
+ childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false,
nil, map[string]string{"first": "5"})
+ childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false,
nil, map[string]string{"first": "5"}, appQueueMapping)
assert.NilError(t, err)
- alloc1, alloc2, err := creatApp1(childQ1, node1, node2,
map[string]resources.Quantity{"first": 2, "pods": 1})
+ alloc1, alloc2, err := creatApp1(childQ1, node1, node2,
map[string]resources.Quantity{"first": 2, "pods": 1}, appQueueMapping)
assert.NilError(t, err)
- app2, ask3, err := creatApp2(childQ2,
map[string]resources.Quantity{"first": 5}, "alloc3")
+ app2, ask3, err := creatApp2(childQ2,
map[string]resources.Quantity{"first": 5}, "alloc3", appQueueMapping)
assert.NilError(t, err)
headRoom :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "pods":
3})
@@ -599,22 +613,23 @@ func
TestTryPreemption_VictimsAvailable_InsufficientResource(t *testing.T) {
// root.parent.child2. Guaranteed set, first: 5. Request of first:5 is waiting
for resources.
// 2 Allocation on root.parent.child1 has been found and considered as
victims. Since victims total resource usage (first: 4) is lesser than ask
requirment (first: 5), preemption won't help. Hence, victims are dropped.
func TestTryPreemption_VictimsOnDifferentNodes_InsufficientResource(t
*testing.T) {
+ appQueueMapping := NewAppQueueMapping()
node1 := newNode(nodeID1, map[string]resources.Quantity{"first": 5,
"pods": 1})
node2 := newNode(nodeID2, map[string]resources.Quantity{"first": 5,
"pods": 1})
iterator := getNodeIteratorFn(node1, node2)
rootQ, err := createRootQueue(map[string]string{"first": "10", "pods":
"2"})
assert.NilError(t, err)
- parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"first": "6"}, nil)
+ parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"first": "6"}, nil, appQueueMapping)
assert.NilError(t, err)
- childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false,
nil, nil)
+ childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false,
nil, map[string]string{"first": "5"})
+ childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false,
nil, map[string]string{"first": "5"}, appQueueMapping)
assert.NilError(t, err)
- alloc1, alloc2, err := creatApp1(childQ1, node1, node2,
map[string]resources.Quantity{"first": 2, "pods": 1})
+ alloc1, alloc2, err := creatApp1(childQ1, node1, node2,
map[string]resources.Quantity{"first": 2, "pods": 1}, appQueueMapping)
assert.NilError(t, err)
- app2, ask3, err := creatApp2(childQ2,
map[string]resources.Quantity{"first": 5}, "alloc3")
+ app2, ask3, err := creatApp2(childQ2,
map[string]resources.Quantity{"first": 5}, "alloc3", appQueueMapping)
assert.NilError(t, err)
headRoom :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "pods":
3})
@@ -649,20 +664,21 @@ func
TestTryPreemption_VictimsOnDifferentNodes_InsufficientResource(t *testing.T
// root.parent.child2. Guaranteed set, first: 5. Request of first:5 is waiting
for resources.
// 2 Allocation on root.parent.child1 has been found and considered as victims
and preempted to free up resources for ask.
func TestTryPreemption_VictimsAvailableOnDifferentNodes(t *testing.T) {
+ appQueueMapping := NewAppQueueMapping()
node1 := newNode(nodeID1, map[string]resources.Quantity{"first": 5,
"pods": 1})
node2 := newNode(nodeID2, map[string]resources.Quantity{"first": 4,
"pods": 1})
iterator := getNodeIteratorFn(node1, node2)
rootQ, err := createRootQueue(map[string]string{"first": "9", "pods":
"2"})
assert.NilError(t, err)
- parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"first": "6"}, nil)
+ parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"first": "6"}, nil, appQueueMapping)
assert.NilError(t, err)
- childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false,
nil, nil)
+ childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false,
nil, map[string]string{"first": "5"})
+ childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false,
nil, map[string]string{"first": "5"}, appQueueMapping)
assert.NilError(t, err)
app1 := newApplication(appID1, "default", "root.parent.child1")
app1.SetQueue(childQ1)
- childQ1.applications[appID1] = app1
+ childQ1.AddApplication(app1)
ask1 := newAllocationAsk("alloc1", appID1,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 4, "pods":
1}))
ask1.createTime = time.Now().Add(-1 * time.Minute)
assert.NilError(t, app1.AddAllocationAsk(ask1))
@@ -672,15 +688,17 @@ func TestTryPreemption_VictimsAvailableOnDifferentNodes(t
*testing.T) {
alloc1 := newAllocationWithKey("alloc1", appID1, nodeID1,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 4, "pods":
1}))
alloc1.createTime = ask1.createTime
app1.AddAllocation(alloc1)
+ appQueueMapping.AddAppQueueMapping(app1.ApplicationID, childQ1)
assert.Check(t, node1.TryAddAllocation(alloc1), "node alloc1 failed")
alloc2 := newAllocationWithKey("alloc2", appID1, nodeID2,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 2, "pods":
1}))
alloc2.createTime = ask2.createTime
app1.AddAllocation(alloc2)
+ appQueueMapping.AddAppQueueMapping(app1.ApplicationID, childQ1)
assert.Check(t, node2.TryAddAllocation(alloc2), "node alloc2 failed")
assert.NilError(t,
childQ1.TryIncAllocatedResource(ask1.GetAllocatedResource()))
assert.NilError(t,
childQ1.TryIncAllocatedResource(ask2.GetAllocatedResource()))
- app2, ask3, err := creatApp2(childQ2,
map[string]resources.Quantity{"first": 5}, "alloc3")
+ app2, ask3, err := creatApp2(childQ2,
map[string]resources.Quantity{"first": 5}, "alloc3", appQueueMapping)
assert.NilError(t, err)
headRoom :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "pods":
3})
@@ -717,26 +735,28 @@ func TestTryPreemption_VictimsAvailableOnDifferentNodes(t
*testing.T) {
// option 1 >> option 2 >> option 3. In option 3, preempting third allocation
is unnecessary, should avoid this option.
// Either option 1 or option2 is fine, but not option 3.
func TestTryPreemption_OnQueue_VictimsOnDifferentNodes(t *testing.T) {
+ appQueueMapping := NewAppQueueMapping()
node1 := newNode(nodeID1, map[string]resources.Quantity{"first": 30})
node2 := newNode(nodeID2, map[string]resources.Quantity{"first": 30})
iterator := getNodeIteratorFn(node1, node2)
rootQ, err := createRootQueue(map[string]string{"first": "60"})
assert.NilError(t, err)
- parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"first": "18"}, nil)
+ parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"first": "18"}, nil, appQueueMapping)
assert.NilError(t, err)
- childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false,
nil, nil)
+ childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false,
nil, map[string]string{"first": "15"})
+ childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false,
nil, map[string]string{"first": "15"}, appQueueMapping)
assert.NilError(t, err)
- childQ3, err := createManagedQueueGuaranteed(parentQ, "child3", false,
nil, nil)
+ childQ3, err := createManagedQueueGuaranteed(parentQ, "child3", false,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- alloc1, alloc2, err := creatApp1(childQ1, node1, node2,
map[string]resources.Quantity{"first": 5})
+ alloc1, alloc2, err := creatApp1(childQ1, node1, node2,
map[string]resources.Quantity{"first": 5}, appQueueMapping)
assert.NilError(t, err)
app3 := newApplication(appID3, "default", "root.parent.child3")
app3.SetQueue(childQ3)
- childQ3.applications[appID3] = app3
+ childQ3.AddApplication(app3)
+ appQueueMapping.AddAppQueueMapping(app3.ApplicationID, childQ3)
ask4 := newAllocationAsk("alloc4", appID3,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
ask4.createTime = time.Now()
@@ -748,7 +768,7 @@ func TestTryPreemption_OnQueue_VictimsOnDifferentNodes(t
*testing.T) {
assert.Check(t, node2.TryAddAllocation(alloc4), "node alloc2 failed")
assert.NilError(t,
childQ3.TryIncAllocatedResource(ask4.GetAllocatedResource()))
- app2, ask3, err := creatApp2(childQ2,
map[string]resources.Quantity{"first": 10}, "alloc3")
+ app2, ask3, err := creatApp2(childQ2,
map[string]resources.Quantity{"first": 10}, "alloc3", appQueueMapping)
assert.NilError(t, err)
headRoom :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "pods":
3})
@@ -786,22 +806,24 @@ func TestTryPreemption_OnQueue_VictimsOnDifferentNodes(t
*testing.T) {
// root.parent.child3. Guaranteed not set. 1 Allocation is running on node2.
Total usage is first:5.
// High priority ask should not be touched and remaining 2 allocs should be
preempted to free up resources
func TestTryPreemption_OnQueue_VictimsAvailable_LowerPriority(t *testing.T) {
+ appQueueMapping := NewAppQueueMapping()
node1 := newNode(nodeID1, map[string]resources.Quantity{"first": 30})
node2 := newNode(nodeID2, map[string]resources.Quantity{"first": 30})
iterator := getNodeIteratorFn(node1, node2)
rootQ, err := createRootQueue(map[string]string{"first": "60"})
assert.NilError(t, err)
- parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"first": "18"}, nil)
+ parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"first": "18"}, nil, appQueueMapping)
assert.NilError(t, err)
- childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false,
nil, nil)
+ childQ1, err := createManagedQueueGuaranteed(parentQ, "child1", false,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false,
nil, map[string]string{"first": "15"})
+ childQ2, err := createManagedQueueGuaranteed(parentQ, "child2", false,
nil, map[string]string{"first": "15"}, appQueueMapping)
assert.NilError(t, err)
- childQ3, err := createManagedQueueGuaranteed(parentQ, "child3", false,
nil, nil)
+ childQ3, err := createManagedQueueGuaranteed(parentQ, "child3", false,
nil, nil, appQueueMapping)
assert.NilError(t, err)
app1 := newApplication(appID1, "default", "root.parent.child1")
app1.SetQueue(childQ1)
- childQ1.applications[appID1] = app1
+ childQ1.AddApplication(app1)
+ appQueueMapping.AddAppQueueMapping(app1.ApplicationID, childQ1)
ask1 := newAllocationAsk("alloc1", appID1,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}))
ask1.createTime = time.Now().Add(-2 * time.Minute)
assert.NilError(t, app1.AddAllocationAsk(ask1))
@@ -823,7 +845,8 @@ func
TestTryPreemption_OnQueue_VictimsAvailable_LowerPriority(t *testing.T) {
app3 := newApplication(appID3, "default", "root.parent.child3")
app3.SetQueue(childQ3)
- childQ3.applications[appID3] = app3
+ childQ3.AddApplication(app3)
+ appQueueMapping.AddAppQueueMapping(app3.ApplicationID, childQ3)
ask4 := newAllocationAskPriority("alloc4", appID3,
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}), 1000)
ask4.createTime = time.Now()
@@ -835,7 +858,7 @@ func
TestTryPreemption_OnQueue_VictimsAvailable_LowerPriority(t *testing.T) {
assert.Check(t, node2.TryAddAllocation(alloc4), "node alloc2 failed")
assert.NilError(t,
childQ3.TryIncAllocatedResource(ask4.GetAllocatedResource()))
- app2, ask3, err := creatApp2(childQ2,
map[string]resources.Quantity{"first": 10}, "alloc3")
+ app2, ask3, err := creatApp2(childQ2,
map[string]resources.Quantity{"first": 10}, "alloc3", appQueueMapping)
assert.NilError(t, err)
headRoom :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "pods":
3})
@@ -872,27 +895,29 @@ func
TestTryPreemption_OnQueue_VictimsAvailable_LowerPriority(t *testing.T) {
// root.parent.parent2.child3. No usage, no guaranteed set
// 1 Allocation on root.parent.parent1.child2 should be preempted to free up
resources for ask arrived in root.parent.parent1.child1.
func TestTryPreemption_AskResTypesDifferent_GuaranteedSetOnPreemptorSide(t
*testing.T) {
+ appQueueMapping := NewAppQueueMapping()
node := newNode(nodeID1, map[string]resources.Quantity{"vcores": 3,
"mem": 400})
iterator := getNodeIteratorFn(node)
rootQ, err := createRootQueue(map[string]string{"vcores": "3", "mem":
"400"})
assert.NilError(t, err)
- parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"vcores": "2"}, nil)
+ parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"vcores": "2"}, nil, appQueueMapping)
assert.NilError(t, err)
- parentQ1, err := createManagedQueueGuaranteed(parentQ, "parent1", true,
nil, map[string]string{"vcores": "1"})
+ parentQ1, err := createManagedQueueGuaranteed(parentQ, "parent1", true,
nil, map[string]string{"vcores": "1"}, appQueueMapping)
assert.NilError(t, err)
- parentQ2, err := createManagedQueueGuaranteed(parentQ, "parent2", true,
nil, nil)
+ parentQ2, err := createManagedQueueGuaranteed(parentQ, "parent2", true,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- childQ1, err := createManagedQueueGuaranteed(parentQ1, "child1", false,
nil, nil)
+ childQ1, err := createManagedQueueGuaranteed(parentQ1, "child1", false,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- childQ2, err := createManagedQueueGuaranteed(parentQ2, "child2", false,
nil, nil)
+ childQ2, err := createManagedQueueGuaranteed(parentQ2, "child2", false,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- _, err = createManagedQueueGuaranteed(parentQ2, "child3", false, nil,
nil)
+ _, err = createManagedQueueGuaranteed(parentQ2, "child3", false, nil,
nil, appQueueMapping)
assert.NilError(t, err)
app1 := newApplication(appID1, "default", "root.parent.parent2.child2")
app1.SetQueue(childQ2)
- childQ2.applications[appID1] = app1
+ childQ2.AddApplication(app1)
+ appQueueMapping.AddAppQueueMapping(app1.ApplicationID, childQ2)
ask1 := newAllocationAsk("alloc1", appID1,
resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1, "mem":
200}))
ask1.createTime = time.Now().Add(-1 * time.Minute)
assert.NilError(t, app1.AddAllocationAsk(ask1))
@@ -911,7 +936,8 @@ func
TestTryPreemption_AskResTypesDifferent_GuaranteedSetOnPreemptorSide(t *test
assert.NilError(t,
childQ2.TryIncAllocatedResource(ask2.GetAllocatedResource()))
app2 := newApplication(appID2, "default", "root.parent.parent1.child1")
app2.SetQueue(childQ1)
- childQ1.applications[appID2] = app2
+ childQ1.AddApplication(app2)
+ appQueueMapping.AddAppQueueMapping(app2.ApplicationID, childQ1)
ask3 := newAllocationAsk("alloc3", appID2,
resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1}))
assert.NilError(t, app2.AddAllocationAsk(ask3))
headRoom :=
resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 2})
@@ -945,27 +971,29 @@ func
TestTryPreemption_AskResTypesDifferent_GuaranteedSetOnPreemptorSide(t *test
// root.parent.parent2.child3. No usage, no guaranteed set
// 1 Allocation on root.parent.parent1.child2 should be preempted to free up
resources for ask arrived in root.parent.parent1.child1.
func
TestTryPreemption_OnNode_AskResTypesDifferent_GuaranteedSetOnPreemptorSide(t
*testing.T) {
+ appQueueMapping := NewAppQueueMapping()
node := newNode(nodeID1, map[string]resources.Quantity{"vcores": 2,
"mem": 400})
iterator := getNodeIteratorFn(node)
rootQ, err := createRootQueue(map[string]string{"vcores": "2", "mem":
"400"})
assert.NilError(t, err)
- parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"vcores": "2"}, nil)
+ parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"vcores": "2"}, nil, appQueueMapping)
assert.NilError(t, err)
- parentQ1, err := createManagedQueueGuaranteed(parentQ, "parent1", true,
nil, map[string]string{"vcores": "1"})
+ parentQ1, err := createManagedQueueGuaranteed(parentQ, "parent1", true,
nil, map[string]string{"vcores": "1"}, appQueueMapping)
assert.NilError(t, err)
- parentQ2, err := createManagedQueueGuaranteed(parentQ, "parent2", true,
nil, nil)
+ parentQ2, err := createManagedQueueGuaranteed(parentQ, "parent2", true,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- childQ1, err := createManagedQueueGuaranteed(parentQ1, "child1", false,
nil, nil)
+ childQ1, err := createManagedQueueGuaranteed(parentQ1, "child1", false,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- childQ2, err := createManagedQueueGuaranteed(parentQ2, "child2", false,
nil, nil)
+ childQ2, err := createManagedQueueGuaranteed(parentQ2, "child2", false,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- _, err = createManagedQueueGuaranteed(parentQ2, "child3", false, nil,
nil)
+ _, err = createManagedQueueGuaranteed(parentQ2, "child3", false, nil,
nil, appQueueMapping)
assert.NilError(t, err)
app1 := newApplication(appID1, "default", "root.parent.parent2.child2")
app1.SetQueue(childQ2)
- childQ2.applications[appID1] = app1
+ childQ2.AddApplication(app1)
+ appQueueMapping.AddAppQueueMapping(app1.ApplicationID, childQ2)
ask1 := newAllocationAsk("alloc1", appID1,
resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1, "mem":
200}))
ask1.createTime = time.Now().Add(-1 * time.Minute)
assert.NilError(t, app1.AddAllocationAsk(ask1))
@@ -984,7 +1012,8 @@ func
TestTryPreemption_OnNode_AskResTypesDifferent_GuaranteedSetOnPreemptorSide(
assert.NilError(t,
childQ2.TryIncAllocatedResource(ask2.GetAllocatedResource()))
app2 := newApplication(appID2, "default", "root.parent.parent1.child1")
app2.SetQueue(childQ1)
- childQ1.applications[appID2] = app2
+ childQ1.AddApplication(app2)
+ appQueueMapping.AddAppQueueMapping(app2.ApplicationID, childQ1)
ask3 := newAllocationAsk("alloc3", appID2,
resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1}))
assert.NilError(t, app2.AddAllocationAsk(ask3))
headRoom :=
resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 2})
@@ -1021,24 +1050,25 @@ func
TestTryPreemption_OnNode_AskResTypesDifferent_GuaranteedSetOnPreemptorSide(
//
//nolint:funlen
func
TestTryPreemption_AskResTypesDifferent_GuaranteedSetOnVictimAndPreemptorSides(t
*testing.T) {
+ appQueueMapping := NewAppQueueMapping()
node := newNode(nodeID1, map[string]resources.Quantity{"vcores": 5,
"mem": 700})
iterator := getNodeIteratorFn(node)
rootQ, err := createRootQueue(map[string]string{"vcores": "5", "mem":
"700"})
assert.NilError(t, err)
- parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"vcores": "3"}, nil)
+ parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"vcores": "3"}, nil, appQueueMapping)
assert.NilError(t, err)
- parentQ1, err := createManagedQueueGuaranteed(parentQ, "parent1", true,
nil, nil)
+ parentQ1, err := createManagedQueueGuaranteed(parentQ, "parent1", true,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- parentQ2, err := createManagedQueueGuaranteed(parentQ, "parent2", true,
nil, nil)
+ parentQ2, err := createManagedQueueGuaranteed(parentQ, "parent2", true,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- childQ1, err := createManagedQueueGuaranteed(parentQ1, "child1", false,
nil, map[string]string{"vcores": "2"})
+ childQ1, err := createManagedQueueGuaranteed(parentQ1, "child1", false,
nil, map[string]string{"vcores": "2"}, appQueueMapping)
assert.NilError(t, err)
- childQ2, err := createManagedQueueGuaranteed(parentQ2, "child2", false,
nil, map[string]string{"vcores": "1"})
+ childQ2, err := createManagedQueueGuaranteed(parentQ2, "child2", false,
nil, map[string]string{"vcores": "1"}, appQueueMapping)
assert.NilError(t, err)
- _, err = createManagedQueueGuaranteed(parentQ2, "child3", false, nil,
nil)
+ _, err = createManagedQueueGuaranteed(parentQ2, "child3", false, nil,
nil, appQueueMapping)
assert.NilError(t, err)
- app1, app2, app3 := createVictimApplications(childQ2)
+ app1, app2, app3 := createVictimApplications(childQ2, appQueueMapping)
for i := 5; i < 8; i++ {
askN := newAllocationAsk(alloc+strconv.Itoa(i), appID1,
resources.NewResourceFromMap(map[string]resources.Quantity{"mem": 100}))
askN.createTime = time.Now().Add(-2 * time.Minute)
@@ -1118,24 +1148,25 @@ func
TestTryPreemption_AskResTypesDifferent_GuaranteedSetOnVictimAndPreemptorSid
// 3rd allocation of vcores:1, mem: 100 should not be touched as preempting
the same would make usage goes below the guaranteed set on
root.parent.parent2.child2.
// All remaining three allocation of each mem: 100 should not be touched at
all as there is no matching resource type between these allocs and ask resource
types.
func
TestTryPreemption_OnNode_AskResTypesDifferent_GuaranteedSetOnVictimAndPreemptorSides(t
*testing.T) {
+ appQueueMapping := NewAppQueueMapping()
node := newNode(nodeID1, map[string]resources.Quantity{"vcores": 3,
"mem": 600})
iterator := getNodeIteratorFn(node)
rootQ, err := createRootQueue(map[string]string{"vcores": "3", "mem":
"600"})
assert.NilError(t, err)
- parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"vcores": "3"}, nil)
+ parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"vcores": "3"}, nil, appQueueMapping)
assert.NilError(t, err)
- parentQ1, err := createManagedQueueGuaranteed(parentQ, "parent1", true,
nil, nil)
+ parentQ1, err := createManagedQueueGuaranteed(parentQ, "parent1", true,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- parentQ2, err := createManagedQueueGuaranteed(parentQ, "parent2", true,
nil, nil)
+ parentQ2, err := createManagedQueueGuaranteed(parentQ, "parent2", true,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- childQ1, err := createManagedQueueGuaranteed(parentQ1, "child1", false,
nil, map[string]string{"vcores": "2"})
+ childQ1, err := createManagedQueueGuaranteed(parentQ1, "child1", false,
nil, map[string]string{"vcores": "2"}, appQueueMapping)
assert.NilError(t, err)
- childQ2, err := createManagedQueueGuaranteed(parentQ2, "child2", false,
nil, map[string]string{"vcores": "1"})
+ childQ2, err := createManagedQueueGuaranteed(parentQ2, "child2", false,
nil, map[string]string{"vcores": "1"}, appQueueMapping)
assert.NilError(t, err)
- _, err = createManagedQueueGuaranteed(parentQ2, "child3", false, nil,
nil)
+ _, err = createManagedQueueGuaranteed(parentQ2, "child3", false, nil,
nil, appQueueMapping)
assert.NilError(t, err)
- app1, app2, app3 := createVictimApplications(childQ2)
+ app1, app2, app3 := createVictimApplications(childQ2, appQueueMapping)
for i := 5; i < 8; i++ {
askN := newAllocationAsk(alloc+strconv.Itoa(i), appID1,
resources.NewResourceFromMap(map[string]resources.Quantity{"mem": 100}))
askN.createTime = time.Now().Add(-2 * time.Minute)
@@ -1202,16 +1233,21 @@ func
TestTryPreemption_OnNode_AskResTypesDifferent_GuaranteedSetOnVictimAndPreem
assert.Equal(t, len(ask4.GetAllocationLog()), 0)
}
-func createVictimApplications(childQ2 *Queue) (*Application, *Application,
*Application) {
+func createVictimApplications(childQ2 *Queue, appQueueMapping
*AppQueueMapping) (*Application, *Application, *Application) {
app1 := newApplication(appID1, "default", "root.parent.parent2.child2")
app1.SetQueue(childQ2)
- childQ2.applications[appID1] = app1
+ childQ2.AddApplication(app1)
+ appQueueMapping.AddAppQueueMapping(app1.ApplicationID, childQ2)
+
app2 := newApplication(appID2, "default", "root.parent.parent2.child2")
app2.SetQueue(childQ2)
- childQ2.applications[appID2] = app2
+ childQ2.AddApplication(app2)
+ appQueueMapping.AddAppQueueMapping(app2.ApplicationID, childQ2)
+
app3 := newApplication(appID3, "default", "root.parent.parent2.child2")
app3.SetQueue(childQ2)
- childQ2.applications[appID3] = app3
+ childQ2.AddApplication(app3)
+ appQueueMapping.AddAppQueueMapping(app3.ApplicationID, childQ2)
return app1, app2, app3
}
@@ -1229,24 +1265,25 @@ func createVictimApplications(childQ2 *Queue)
(*Application, *Application, *Appl
//
//nolint:funlen
func TestTryPreemption_AskResTypesSame_GuaranteedSetOnPreemptorSide(t
*testing.T) {
+ appQueueMapping := NewAppQueueMapping()
node := newNode(nodeID1, map[string]resources.Quantity{"vcores": 5,
"gpu": 300, "mem": 200})
iterator := getNodeIteratorFn(node)
rootQ, err := createRootQueue(map[string]string{"vcores": "5", "gpu":
"300", "mem": "200"})
assert.NilError(t, err)
- parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
nil, nil)
+ parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- parentQ1, err := createManagedQueueGuaranteed(parentQ, "parent1", true,
nil, nil)
+ parentQ1, err := createManagedQueueGuaranteed(parentQ, "parent1", true,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- parentQ2, err := createManagedQueueGuaranteed(parentQ, "parent2", true,
nil, nil)
+ parentQ2, err := createManagedQueueGuaranteed(parentQ, "parent2", true,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- childQ1, err := createManagedQueueGuaranteed(parentQ1, "child1", false,
nil, map[string]string{"vcores": "2"})
+ childQ1, err := createManagedQueueGuaranteed(parentQ1, "child1", false,
nil, map[string]string{"vcores": "2"}, appQueueMapping)
assert.NilError(t, err)
- childQ2, err := createManagedQueueGuaranteed(parentQ2, "child2", false,
nil, nil)
+ childQ2, err := createManagedQueueGuaranteed(parentQ2, "child2", false,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- _, err = createManagedQueueGuaranteed(parentQ2, "child3", false, nil,
nil)
+ _, err = createManagedQueueGuaranteed(parentQ2, "child3", false, nil,
nil, appQueueMapping)
assert.NilError(t, err)
- app1, app2, app3 := createVictimApplications(childQ2)
+ app1, app2, app3 := createVictimApplications(childQ2, appQueueMapping)
for i := 5; i < 8; i++ {
askN := newAllocationAsk(alloc+strconv.Itoa(i), appID1,
resources.NewResourceFromMap(map[string]resources.Quantity{"gpu": 100}))
askN.createTime = time.Now().Add(-2 * time.Minute)
@@ -1326,24 +1363,25 @@ func
TestTryPreemption_AskResTypesSame_GuaranteedSetOnPreemptorSide(t *testing.T
// but last allocation should not be touched as preempting the same would make
usage goes above the guaranteed set on preemptor or ask queue
root.parent.parent2.child1.
// All remaining three allocation of each mem: 100 should not be touched at
all as there is no matching resource type between these allocs and ask resource
types.
func TestTryPreemption_OnNode_AskResTypesSame_GuaranteedSetOnPreemptorSide(t
*testing.T) {
+ appQueueMapping := NewAppQueueMapping()
node := newNode(nodeID1, map[string]resources.Quantity{"vcores": 3,
"gpu": 300, "mem": 200})
iterator := getNodeIteratorFn(node)
rootQ, err := createRootQueue(map[string]string{"vcores": "3", "gpu":
"300", "mem": "200"})
assert.NilError(t, err)
- parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
nil, nil)
+ parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- parentQ1, err := createManagedQueueGuaranteed(parentQ, "parent1", true,
nil, nil)
+ parentQ1, err := createManagedQueueGuaranteed(parentQ, "parent1", true,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- parentQ2, err := createManagedQueueGuaranteed(parentQ, "parent2", true,
nil, nil)
+ parentQ2, err := createManagedQueueGuaranteed(parentQ, "parent2", true,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- childQ1, err := createManagedQueueGuaranteed(parentQ1, "child1", false,
nil, map[string]string{"vcores": "2"})
+ childQ1, err := createManagedQueueGuaranteed(parentQ1, "child1", false,
nil, map[string]string{"vcores": "2"}, appQueueMapping)
assert.NilError(t, err)
- childQ2, err := createManagedQueueGuaranteed(parentQ2, "child2", false,
nil, nil)
+ childQ2, err := createManagedQueueGuaranteed(parentQ2, "child2", false,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- _, err = createManagedQueueGuaranteed(parentQ2, "child3", false, nil,
nil)
+ _, err = createManagedQueueGuaranteed(parentQ2, "child3", false, nil,
nil, appQueueMapping)
assert.NilError(t, err)
- app1, app2, app3 := createVictimApplications(childQ2)
+ app1, app2, app3 := createVictimApplications(childQ2, appQueueMapping)
for i := 5; i < 8; i++ {
askN := newAllocationAsk(alloc+strconv.Itoa(i), appID1,
resources.NewResourceFromMap(map[string]resources.Quantity{"gpu": 100}))
askN.createTime = time.Now().Add(-2 * time.Minute)
@@ -1424,24 +1462,25 @@ func
TestTryPreemption_OnNode_AskResTypesSame_GuaranteedSetOnPreemptorSide(t *te
//
//nolint:funlen
func
TestTryPreemption_AskResTypesSame_GuaranteedSetOnVictimAndPreemptorSides(t
*testing.T) {
+ appQueueMapping := NewAppQueueMapping()
node := newNode(nodeID1, map[string]resources.Quantity{"vcores": 5,
"gpu": 700, "mem": 200})
iterator := getNodeIteratorFn(node)
rootQ, err := createRootQueue(map[string]string{"vcores": "5", "gpu":
"700", "mem": "200"})
assert.NilError(t, err)
- parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
nil, nil)
+ parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- parentQ1, err := createManagedQueueGuaranteed(parentQ, "parent1", true,
nil, nil)
+ parentQ1, err := createManagedQueueGuaranteed(parentQ, "parent1", true,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- parentQ2, err := createManagedQueueGuaranteed(parentQ, "parent2", true,
nil, nil)
+ parentQ2, err := createManagedQueueGuaranteed(parentQ, "parent2", true,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- childQ1, err := createManagedQueueGuaranteed(parentQ1, "child1", false,
nil, map[string]string{"vcores": "2"})
+ childQ1, err := createManagedQueueGuaranteed(parentQ1, "child1", false,
nil, map[string]string{"vcores": "2"}, appQueueMapping)
assert.NilError(t, err)
- childQ2, err := createManagedQueueGuaranteed(parentQ2, "child2", false,
nil, map[string]string{"vcores": "1"})
+ childQ2, err := createManagedQueueGuaranteed(parentQ2, "child2", false,
nil, map[string]string{"vcores": "1"}, appQueueMapping)
assert.NilError(t, err)
- _, err = createManagedQueueGuaranteed(parentQ2, "child3", false, nil,
nil)
+ _, err = createManagedQueueGuaranteed(parentQ2, "child3", false, nil,
nil, appQueueMapping)
assert.NilError(t, err)
- app1, app2, app3 := createVictimApplications(childQ2)
+ app1, app2, app3 := createVictimApplications(childQ2, appQueueMapping)
for i := 5; i < 8; i++ {
askN := newAllocationAsk(alloc+strconv.Itoa(i), appID1,
resources.NewResourceFromMap(map[string]resources.Quantity{"gpu": 100}))
askN.createTime = time.Now().Add(-2 * time.Minute)
@@ -1522,24 +1561,25 @@ func
TestTryPreemption_AskResTypesSame_GuaranteedSetOnVictimAndPreemptorSides(t
// 3rd allocation of vcores:1 should not be touched as preempting the same
would make usage goes below the guaranteed set on root.parent.parent2.child2.
// All remaining three allocation of each mem: 100 should not be touched at
all as there is no matching resource type between these allocs and ask resource
types.
func
TestTryPreemption_OnNode_AskResTypesSame_GuaranteedSetOnVictimAndPreemptorSides(t
*testing.T) {
+ appQueueMapping := NewAppQueueMapping()
node := newNode(nodeID1, map[string]resources.Quantity{"vcores": 3,
"gpu": 700, "mem": 200})
iterator := getNodeIteratorFn(node)
rootQ, err := createRootQueue(map[string]string{"vcores": "3", "gpu":
"700", "mem": "200"})
assert.NilError(t, err)
- parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
nil, nil)
+ parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- parentQ1, err := createManagedQueueGuaranteed(parentQ, "parent1", true,
nil, nil)
+ parentQ1, err := createManagedQueueGuaranteed(parentQ, "parent1", true,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- parentQ2, err := createManagedQueueGuaranteed(parentQ, "parent2", true,
nil, nil)
+ parentQ2, err := createManagedQueueGuaranteed(parentQ, "parent2", true,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- childQ1, err := createManagedQueueGuaranteed(parentQ1, "child1", false,
nil, map[string]string{"vcores": "2"})
+ childQ1, err := createManagedQueueGuaranteed(parentQ1, "child1", false,
nil, map[string]string{"vcores": "2"}, appQueueMapping)
assert.NilError(t, err)
- childQ2, err := createManagedQueueGuaranteed(parentQ2, "child2", false,
nil, map[string]string{"vcores": "1"})
+ childQ2, err := createManagedQueueGuaranteed(parentQ2, "child2", false,
nil, map[string]string{"vcores": "1"}, appQueueMapping)
assert.NilError(t, err)
- _, err = createManagedQueueGuaranteed(parentQ2, "child3", false, nil,
nil)
+ _, err = createManagedQueueGuaranteed(parentQ2, "child3", false, nil,
nil, appQueueMapping)
assert.NilError(t, err)
- app1, app2, app3 := createVictimApplications(childQ2)
+ app1, app2, app3 := createVictimApplications(childQ2, appQueueMapping)
for i := 5; i < 8; i++ {
askN := newAllocationAsk(alloc+strconv.Itoa(i), appID1,
resources.NewResourceFromMap(map[string]resources.Quantity{"gpu": 100}))
askN.createTime = time.Now().Add(-2 * time.Minute)
@@ -1617,27 +1657,29 @@ func
TestTryPreemption_OnNode_AskResTypesSame_GuaranteedSetOnVictimAndPreemptorS
// root.parent.parent2.child3. No usage, no guaranteed set
// 1 Allocation on root.parent.parent1.child1 should be preempted to free up
resources for ask arrived in root.parent.parent1.child2. Also, it won't lead to
preemption storm or loop.
func
TestTryPreemption_OnNode_UGParent_With_UGPreemptorChild_GNotSetOnVictimChild_As_Siblings(t
*testing.T) {
+ appQueueMapping := NewAppQueueMapping()
node := newNode(nodeID1, map[string]resources.Quantity{"vcores": 2})
iterator := getNodeIteratorFn(node)
rootQ, err := createRootQueue(map[string]string{"vcores": "25"})
assert.NilError(t, err)
- parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"vcores": "20"}, nil)
+ parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"vcores": "20"}, nil, appQueueMapping)
assert.NilError(t, err)
- parentQ1, err := createManagedQueueGuaranteed(parentQ, "parent1", true,
nil, map[string]string{"vcores": "10"})
+ parentQ1, err := createManagedQueueGuaranteed(parentQ, "parent1", true,
nil, map[string]string{"vcores": "10"}, appQueueMapping)
assert.NilError(t, err)
- parentQ2, err := createManagedQueueGuaranteed(parentQ, "parent2", true,
nil, nil)
+ parentQ2, err := createManagedQueueGuaranteed(parentQ, "parent2", true,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- childQ1, err := createManagedQueueGuaranteed(parentQ1, "child1", false,
nil, nil)
+ childQ1, err := createManagedQueueGuaranteed(parentQ1, "child1", false,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- childQ2, err := createManagedQueueGuaranteed(parentQ1, "child2", false,
nil, map[string]string{"vcores": "1"})
+ childQ2, err := createManagedQueueGuaranteed(parentQ1, "child2", false,
nil, map[string]string{"vcores": "1"}, appQueueMapping)
assert.NilError(t, err)
- _, err = createManagedQueueGuaranteed(parentQ2, "child3", false, nil,
nil)
+ _, err = createManagedQueueGuaranteed(parentQ2, "child3", false, nil,
nil, appQueueMapping)
assert.NilError(t, err)
app1 := newApplication(appID1, "default", "root.parent.parent1.child1")
app1.SetQueue(childQ1)
- childQ1.applications[appID1] = app1
+ childQ1.AddApplication(app1)
+ appQueueMapping.AddAppQueueMapping(appID1, childQ1)
ask1 := newAllocationAsk("alloc1", appID1,
resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1}))
ask1.createTime = time.Now().Add(-2 * time.Minute)
assert.NilError(t, app1.AddAllocationAsk(ask1))
@@ -1649,7 +1691,8 @@ func
TestTryPreemption_OnNode_UGParent_With_UGPreemptorChild_GNotSetOnVictimChil
app2 := newApplication(appID2, "default", "root.parent.parent1.child1")
app2.SetQueue(childQ2)
- childQ1.applications[appID2] = app2
+ childQ1.AddApplication(app2)
+ appQueueMapping.AddAppQueueMapping(appID2, childQ2)
ask2 := newAllocationAsk("alloc2", appID2,
resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1}))
ask2.createTime = time.Now().Add(-1 * time.Minute)
assert.NilError(t, app2.AddAllocationAsk(ask2))
@@ -1661,7 +1704,8 @@ func
TestTryPreemption_OnNode_UGParent_With_UGPreemptorChild_GNotSetOnVictimChil
app3 := newApplication(appID3, "default", "root.parent.parent1.child2")
app3.SetQueue(childQ2)
- childQ2.applications[appID3] = app3
+ childQ2.AddApplication(app3)
+ appQueueMapping.AddAppQueueMapping(appID3, childQ2)
ask3 := newAllocationAsk("alloc3", appID3,
resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1}))
assert.NilError(t, app3.AddAllocationAsk(ask3))
@@ -1694,27 +1738,29 @@ func
TestTryPreemption_OnNode_UGParent_With_UGPreemptorChild_GNotSetOnVictimChil
// root.parent.parent2.child3. No usage, no guaranteed set
// 1 Allocation on root.parent.parent1.child1 should not be preempted to free
up resources for ask arrived in root.parent.parent1.child2 because it could
lead to preemption storm or loop.
func TestTryPreemption_OnNode_UGParent_With_GNotSetOnBothChilds(t *testing.T) {
+ appQueueMapping := NewAppQueueMapping()
node := newNode(nodeID1, map[string]resources.Quantity{"vcores": 2})
iterator := getNodeIteratorFn(node)
rootQ, err := createRootQueue(map[string]string{"vcores": "25"})
assert.NilError(t, err)
- parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"vcores": "20"}, nil)
+ parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"vcores": "20"}, nil, appQueueMapping)
assert.NilError(t, err)
- parentQ1, err := createManagedQueueGuaranteed(parentQ, "parent1", true,
nil, map[string]string{"vcores": "10"})
+ parentQ1, err := createManagedQueueGuaranteed(parentQ, "parent1", true,
nil, map[string]string{"vcores": "10"}, appQueueMapping)
assert.NilError(t, err)
- parentQ2, err := createManagedQueueGuaranteed(parentQ, "parent2", true,
nil, nil)
+ parentQ2, err := createManagedQueueGuaranteed(parentQ, "parent2", true,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- childQ1, err := createManagedQueueGuaranteed(parentQ1, "child1", false,
nil, nil)
+ childQ1, err := createManagedQueueGuaranteed(parentQ1, "child1", false,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- childQ2, err := createManagedQueueGuaranteed(parentQ1, "child2", false,
nil, nil)
+ childQ2, err := createManagedQueueGuaranteed(parentQ1, "child2", false,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- _, err = createManagedQueueGuaranteed(parentQ2, "child3", false, nil,
nil)
+ _, err = createManagedQueueGuaranteed(parentQ2, "child3", false, nil,
nil, appQueueMapping)
assert.NilError(t, err)
app1 := newApplication(appID1, "default", "root.parent.parent1.child1")
app1.SetQueue(childQ1)
- childQ1.applications[appID1] = app1
+ childQ1.AddApplication(app1)
+ appQueueMapping.AddAppQueueMapping(appID1, childQ1)
ask1 := newAllocationAsk("alloc1", appID1,
resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1}))
ask1.createTime = time.Now().Add(-2 * time.Minute)
assert.NilError(t, app1.AddAllocationAsk(ask1))
@@ -1726,7 +1772,8 @@ func
TestTryPreemption_OnNode_UGParent_With_GNotSetOnBothChilds(t *testing.T) {
app2 := newApplication(appID2, "default", "root.parent.parent1.child1")
app2.SetQueue(childQ2)
- childQ1.applications[appID2] = app2
+ childQ1.AddApplication(app2)
+ appQueueMapping.AddAppQueueMapping(appID2, childQ2)
ask2 := newAllocationAsk("alloc2", appID2,
resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1}))
ask2.createTime = time.Now().Add(-1 * time.Minute)
assert.NilError(t, app2.AddAllocationAsk(ask2))
@@ -1738,7 +1785,8 @@ func
TestTryPreemption_OnNode_UGParent_With_GNotSetOnBothChilds(t *testing.T) {
app3 := newApplication(appID3, "default", "root.parent.parent1.child2")
app3.SetQueue(childQ2)
- childQ2.applications[appID3] = app3
+ childQ2.AddApplication(app3)
+ appQueueMapping.AddAppQueueMapping(appID3, childQ2)
ask3 := newAllocationAsk("alloc3", appID3,
resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1}))
assert.NilError(t, app3.AddAllocationAsk(ask3))
@@ -1765,27 +1813,29 @@ func
TestTryPreemption_OnNode_UGParent_With_GNotSetOnBothChilds(t *testing.T) {
// root.parent.parent2.child3. No usage, no guaranteed set
// 1 Allocation on root.parent.parent1.child1 should be preempted to free up
resources for ask arrived in root.parent.parent1.child2. Also, it won't lead to
preemption storm or loop.
func
TestTryPreemption_OnNode_UGParent_With_UGPreemptorChild_OGVictimChild_As_Siblings(t
*testing.T) {
+ appQueueMapping := NewAppQueueMapping()
node := newNode(nodeID1, map[string]resources.Quantity{"vcores": 2})
iterator := getNodeIteratorFn(node)
rootQ, err := createRootQueue(map[string]string{"vcores": "25"})
assert.NilError(t, err)
- parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"vcores": "20"}, nil)
+ parentQ, err := createManagedQueueGuaranteed(rootQ, "parent", true,
map[string]string{"vcores": "20"}, nil, appQueueMapping)
assert.NilError(t, err)
- parentQ1, err := createManagedQueueGuaranteed(parentQ, "parent1", true,
nil, map[string]string{"vcores": "10"})
+ parentQ1, err := createManagedQueueGuaranteed(parentQ, "parent1", true,
nil, map[string]string{"vcores": "10"}, appQueueMapping)
assert.NilError(t, err)
- parentQ2, err := createManagedQueueGuaranteed(parentQ, "parent2", true,
nil, nil)
+ parentQ2, err := createManagedQueueGuaranteed(parentQ, "parent2", true,
nil, nil, appQueueMapping)
assert.NilError(t, err)
- childQ1, err := createManagedQueueGuaranteed(parentQ1, "child1", false,
nil, map[string]string{"vcores": "1"})
+ childQ1, err := createManagedQueueGuaranteed(parentQ1, "child1", false,
nil, map[string]string{"vcores": "1"}, appQueueMapping)
assert.NilError(t, err)
- childQ2, err := createManagedQueueGuaranteed(parentQ1, "child2", false,
nil, map[string]string{"vcores": "1"})
+ childQ2, err := createManagedQueueGuaranteed(parentQ1, "child2", false,
nil, map[string]string{"vcores": "1"}, appQueueMapping)
assert.NilError(t, err)
- _, err = createManagedQueueGuaranteed(parentQ2, "child3", false, nil,
nil)
+ _, err = createManagedQueueGuaranteed(parentQ2, "child3", false, nil,
nil, appQueueMapping)
assert.NilError(t, err)
app1 := newApplication(appID1, "default", "root.parent.parent1.child1")
app1.SetQueue(childQ1)
- childQ1.applications[appID1] = app1
+ childQ1.AddApplication(app1)
+ appQueueMapping.AddAppQueueMapping(appID1, childQ1)
ask1 := newAllocationAsk("alloc1", appID1,
resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1}))
ask1.createTime = time.Now().Add(-2 * time.Minute)
assert.NilError(t, app1.AddAllocationAsk(ask1))
@@ -1797,7 +1847,8 @@ func
TestTryPreemption_OnNode_UGParent_With_UGPreemptorChild_OGVictimChild_As_Si
app2 := newApplication(appID2, "default", "root.parent.parent1.child1")
app2.SetQueue(childQ2)
- childQ1.applications[appID2] = app2
+ childQ1.AddApplication(app2)
+ appQueueMapping.AddAppQueueMapping(appID2, childQ2)
ask2 := newAllocationAsk("alloc2", appID2,
resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1}))
ask2.createTime = time.Now().Add(-1 * time.Minute)
assert.NilError(t, app2.AddAllocationAsk(ask2))
@@ -1809,7 +1860,8 @@ func
TestTryPreemption_OnNode_UGParent_With_UGPreemptorChild_OGVictimChild_As_Si
app3 := newApplication(appID3, "default", "root.parent.parent1.child2")
app3.SetQueue(childQ2)
- childQ2.applications[appID3] = app3
+ childQ2.AddApplication(app3)
+ appQueueMapping.AddAppQueueMapping(appID3, childQ2)
ask3 := newAllocationAsk("alloc3", appID3,
resources.NewResourceFromMap(map[string]resources.Quantity{"vcores": 1}))
assert.NilError(t, app3.AddAllocationAsk(ask3))
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index 19b4a8da..a81b1bfa 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -88,6 +88,7 @@ type Queue struct {
allocatingAcceptedApps map[string]bool
template *template.Template
queueEvents *schedEvt.QueueEvents
+ appQueueMapping *AppQueueMapping // appID mapping to queues
locking.RWMutex
}
@@ -116,10 +117,11 @@ func newBlankQueue() *Queue {
// NewConfiguredQueue creates a new queue from scratch based on the
configuration
// lock free as it cannot be referenced yet.
// If the silence flag is set to true, the function will neither log the queue
creation nor send a queue event.
-func NewConfiguredQueue(conf configs.QueueConfig, parent *Queue, silence bool)
(*Queue, error) {
+func NewConfiguredQueue(conf configs.QueueConfig, parent *Queue, silence bool,
appQueueMapping *AppQueueMapping) (*Queue, error) {
sq := newBlankQueue()
sq.Name = strings.ToLower(conf.Name)
sq.QueuePath = strings.ToLower(conf.Name)
+ sq.appQueueMapping = appQueueMapping
if parent != nil {
sq.QueuePath = parent.QueuePath + configs.DOT + sq.Name
}
@@ -159,14 +161,14 @@ func NewConfiguredQueue(conf configs.QueueConfig, parent
*Queue, silence bool) (
// NewRecoveryQueue creates a recovery queue if it does not exist. The
recovery queue
// is a dynamic queue, but has an invalid name so that it cannot be directly
referenced.
-func NewRecoveryQueue(parent *Queue) (*Queue, error) {
+func NewRecoveryQueue(parent *Queue, appQueueMapping *AppQueueMapping)
(*Queue, error) {
if parent == nil {
return nil, errors.New("recovery queue cannot be created with
nil parent")
}
if parent.GetQueuePath() != configs.RootQueue {
return nil, fmt.Errorf("recovery queue cannot be created with
non-root parent: %s", parent.GetQueuePath())
}
- queue, err := newDynamicQueueInternal(common.RecoveryQueue, true,
parent)
+ queue, err := newDynamicQueueInternal(common.RecoveryQueue, true,
parent, appQueueMapping)
if err == nil {
queue.Lock()
defer queue.Unlock()
@@ -179,7 +181,7 @@ func NewRecoveryQueue(parent *Queue) (*Queue, error) {
// NewDynamicQueue creates a new queue to be added to the system based on the
placement rules
// A dynamically added queue can never be the root queue so parent must be set
// lock free as it cannot be referenced yet
-func NewDynamicQueue(name string, leaf bool, parent *Queue) (*Queue, error) {
+func NewDynamicQueue(name string, leaf bool, parent *Queue, appQueueMapping
*AppQueueMapping) (*Queue, error) {
// fail without a parent
if parent == nil {
return nil, fmt.Errorf("dynamic queue can not be added without
parent: %s", name)
@@ -191,16 +193,17 @@ func NewDynamicQueue(name string, leaf bool, parent
*Queue) (*Queue, error) {
if name == common.RecoveryQueue {
return nil, fmt.Errorf("dynamic queue cannot be
root.@recovery@")
}
- return newDynamicQueueInternal(name, leaf, parent)
+ return newDynamicQueueInternal(name, leaf, parent, appQueueMapping)
}
-func newDynamicQueueInternal(name string, leaf bool, parent *Queue) (*Queue,
error) {
+func newDynamicQueueInternal(name string, leaf bool, parent *Queue,
appQueueMapping *AppQueueMapping) (*Queue, error) {
sq := newBlankQueue()
sq.Name = strings.ToLower(name)
sq.QueuePath = parent.QueuePath + configs.DOT + sq.Name
sq.parent = parent
sq.isManaged = false
sq.isLeaf = leaf
+ sq.appQueueMapping = appQueueMapping
// add to the parent, we might have a partition lock already
// still need to make sure we lock the parent so we do not interfere
with scheduling
@@ -1625,33 +1628,9 @@ func (sq *Queue) GetApplication(appID string)
*Application {
return sq.applications[appID]
}
-// FindQueueByAppID searches the queue hierarchy for an application with the
given appID and returns the queue it belongs to
-func (sq *Queue) FindQueueByAppID(appID string) *Queue {
- if sq == nil {
- return nil
- }
- if sq.parent != nil {
- return sq.parent.FindQueueByAppID(appID)
- }
- return sq.findQueueByAppIDInternal(appID)
-}
-
-func (sq *Queue) findQueueByAppIDInternal(appID string) *Queue {
- if sq == nil {
- return nil
- }
- if sq.IsLeafQueue() {
- if app := sq.GetApplication(appID); app != nil {
- return sq
- }
- return nil
- }
- for _, queue := range sq.GetCopyOfChildren() {
- if result := queue.findQueueByAppIDInternal(appID); result !=
nil {
- return result
- }
- }
- return nil
+// GetQueueByAppID returns the queue that the application with the given appID
belongs to
+func (sq *Queue) GetQueueByAppID(appID string) *Queue {
+ return sq.appQueueMapping.GetQueueByAppId(appID)
}
// getSortType return the queue sort type.
diff --git a/pkg/scheduler/objects/queue_test.go
b/pkg/scheduler/objects/queue_test.go
index 0933ae54..c6c2594e 100644
--- a/pkg/scheduler/objects/queue_test.go
+++ b/pkg/scheduler/objects/queue_test.go
@@ -124,7 +124,7 @@ func TestDynamicSubQueues(t *testing.T) {
// single parent under root
var parent *Queue
- parent, err = createDynamicQueue(root, "parent", true)
+ parent, err = createDynamicQueue(root, "parent", true, nil)
assert.NilError(t, err, "failed to create parent queue")
if parent.IsLeafQueue() || parent.IsManaged() {
t.Errorf("parent queue is not marked as parent")
@@ -134,7 +134,7 @@ func TestDynamicSubQueues(t *testing.T) {
}
// add a leaf under the parent
var leaf *Queue
- leaf, err = createDynamicQueue(parent, "leaf", false)
+ leaf, err = createDynamicQueue(parent, "leaf", false, nil)
assert.NilError(t, err, "failed to create leaf queue")
if len(parent.children) == 0 {
t.Error("leaf queue is not added to the parent queue")
@@ -377,10 +377,10 @@ func TestGetChildQueueInfo(t *testing.T) {
t.Errorf("managed leaf queues are not added to the parent
queue, expected 10 children got %d", len(parent.children))
}
- parent, err = createDynamicQueue(root, "parent-un", true)
+ parent, err = createDynamicQueue(root, "parent-un", true, nil)
assert.NilError(t, err, "failed to create dynamic parent queue")
for i := 0; i < 10; i++ {
- _, err = createDynamicQueue(parent, "leaf-un-"+strconv.Itoa(i),
false)
+ _, err = createDynamicQueue(parent, "leaf-un-"+strconv.Itoa(i),
false, nil)
if err != nil {
t.Errorf("failed to create dynamic queue: %v", err)
}
@@ -1835,35 +1835,37 @@ func TestPreemptionDelay(t *testing.T) {
}
func TestFindQueueByAppID(t *testing.T) {
- root, err := createRootQueue(nil)
+ appQueueMapping := NewAppQueueMapping()
+ root, err := createRootQueueWithAppQueueMapping(nil, appQueueMapping)
assert.NilError(t, err, "failed to create queue")
- parent1, err := createManagedQueue(root, "parent1", true, nil)
+ parent1, err := createManagedQueueWithAppQueueMapping(root, "parent1",
true, nil, appQueueMapping)
assert.NilError(t, err, "failed to create queue")
- parent2, err := createManagedQueue(root, "parent2", true, nil)
+ parent2, err := createManagedQueueWithAppQueueMapping(root, "parent2",
true, nil, appQueueMapping)
assert.NilError(t, err, "failed to create queue")
- leaf1, err := createManagedQueue(parent1, "leaf1", false, nil)
+ leaf1, err := createManagedQueueWithAppQueueMapping(parent1, "leaf1",
false, nil, appQueueMapping)
assert.NilError(t, err, "failed to create queue")
- leaf2, err := createManagedQueue(parent2, "leaf2", false, nil)
+ leaf2, err := createManagedQueueWithAppQueueMapping(parent2, "leaf2",
false, nil, appQueueMapping)
assert.NilError(t, err, "failed to create queue")
app := newApplication(appID1, "default", "root.parent.leaf")
app.pending =
resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 10})
leaf1.AddApplication(app)
+ appQueueMapping.AddAppQueueMapping(appID1, leaf1)
// we should be able to find the queue from any other given the appID
- assert.Equal(t, leaf1, root.FindQueueByAppID(appID1), "failed to find
queue from root")
- assert.Equal(t, leaf1, parent1.FindQueueByAppID(appID1), "failed to
find queue from parent1")
- assert.Equal(t, leaf1, parent2.FindQueueByAppID(appID1), "failed to
find queue from parent2")
- assert.Equal(t, leaf1, leaf1.FindQueueByAppID(appID1), "failed to find
queue from leaf1")
- assert.Equal(t, leaf1, leaf2.FindQueueByAppID(appID1), "failed to find
queue from leaf2")
+ assert.Equal(t, leaf1, root.GetQueueByAppID(appID1), "failed to find
queue from root")
+ assert.Equal(t, leaf1, parent1.GetQueueByAppID(appID1), "failed to find
queue from parent1")
+ assert.Equal(t, leaf1, parent2.GetQueueByAppID(appID1), "failed to find
queue from parent2")
+ assert.Equal(t, leaf1, leaf1.GetQueueByAppID(appID1), "failed to find
queue from leaf1")
+ assert.Equal(t, leaf1, leaf2.GetQueueByAppID(appID1), "failed to find
queue from leaf2")
// non-existent queue should be nil
var none *Queue = nil
- assert.Equal(t, none, root.FindQueueByAppID("missing"), "found queue
reference in root")
- assert.Equal(t, none, parent1.FindQueueByAppID("missing"), "found queue
reference in parent1")
- assert.Equal(t, none, parent2.FindQueueByAppID("missing"), "found queue
reference in parent2")
- assert.Equal(t, none, leaf1.FindQueueByAppID("missing"), "found queue
reference in leaf1")
- assert.Equal(t, none, leaf2.FindQueueByAppID("missing"), "found queue
reference in leaf2")
+ assert.Equal(t, none, root.GetQueueByAppID("missing"), "found queue
reference in root")
+ assert.Equal(t, none, parent1.GetQueueByAppID("missing"), "found queue
reference in parent1")
+ assert.Equal(t, none, parent2.GetQueueByAppID("missing"), "found queue
reference in parent2")
+ assert.Equal(t, none, leaf1.GetQueueByAppID("missing"), "found queue
reference in leaf1")
+ assert.Equal(t, none, leaf2.GetQueueByAppID("missing"), "found queue
reference in leaf2")
}
// nolint: funlen
@@ -1878,13 +1880,13 @@ func TestFindEligiblePreemptionVictims(t *testing.T) {
alloc3 := createAllocation("ask3", appID2, nodeID1, true, true, -1000,
false, res)
root, err := createRootQueue(map[string]string{siCommon.Memory: "1000"})
assert.NilError(t, err, "failed to create queue")
- parent1, err := createManagedQueueGuaranteed(root, "parent1", true,
parentMax, parentGuar)
+ parent1, err := createManagedQueueGuaranteed(root, "parent1", true,
parentMax, parentGuar, nil)
assert.NilError(t, err, "failed to create queue")
- parent2, err := createManagedQueueGuaranteed(root, "parent2", true,
parentMax, parentGuar)
+ parent2, err := createManagedQueueGuaranteed(root, "parent2", true,
parentMax, parentGuar, nil)
assert.NilError(t, err, "failed to create queue")
- leaf1, err := createManagedQueueGuaranteed(parent1, "leaf1", false,
nil, nil)
+ leaf1, err := createManagedQueueGuaranteed(parent1, "leaf1", false,
nil, nil, nil)
assert.NilError(t, err, "failed to create queue")
- leaf2, err := createManagedQueueGuaranteed(parent2, "leaf2", false,
nil, nil)
+ leaf2, err := createManagedQueueGuaranteed(parent2, "leaf2", false,
nil, nil, nil)
assert.NilError(t, err, "failed to create queue")
// verify no victims when no allocations exist
@@ -2164,7 +2166,7 @@ func TestApplyConf(t *testing.T) {
parent, err := createManagedQueueWithProps(nil, "parent", true, nil,
nil)
assert.NilError(t, err, "failed to create basic queue: %v", err)
- child, err := NewDynamicQueue("child", true, parent)
+ child, err := NewDynamicQueue("child", true, parent, nil)
assert.NilError(t, err, "failed to create basic queue: %v", err)
err = child.ApplyConf(childConf)
@@ -2264,7 +2266,7 @@ func TestNewConfiguredQueue(t *testing.T) {
},
},
}
- parent, err := NewConfiguredQueue(parentConfig, nil, false)
+ parent, err := NewConfiguredQueue(parentConfig, nil, false, nil)
assert.NilError(t, err, "failed to create queue: %v", err)
assert.Equal(t, parent.Name, "parent_queue")
assert.Equal(t, parent.QueuePath, "parent_queue")
@@ -2284,7 +2286,7 @@ func TestNewConfiguredQueue(t *testing.T) {
Guaranteed: getResourceConf(),
},
}
- childLeaf, err := NewConfiguredQueue(leafConfig, parent, false)
+ childLeaf, err := NewConfiguredQueue(leafConfig, parent, false, nil)
assert.NilError(t, err, "failed to create queue: %v", err)
assert.Equal(t, childLeaf.QueuePath, "parent_queue.leaf_queue")
assert.Assert(t, childLeaf.template == nil)
@@ -2301,7 +2303,7 @@ func TestNewConfiguredQueue(t *testing.T) {
Name: "nonleaf_queue",
Parent: true,
}
- childNonLeaf, err := NewConfiguredQueue(NonLeafConfig, parent, false)
+ childNonLeaf, err := NewConfiguredQueue(NonLeafConfig, parent, false,
nil)
assert.NilError(t, err, "failed to create queue: %v", err)
assert.Equal(t, childNonLeaf.QueuePath, "parent_queue.nonleaf_queue")
assert.Assert(t, reflect.DeepEqual(childNonLeaf.template,
parent.template))
@@ -2316,7 +2318,7 @@ func TestNewConfiguredQueue(t *testing.T) {
rootConfig := configs.QueueConfig{
Name: "root",
}
- _, err = NewConfiguredQueue(rootConfig, nil, true)
+ _, err = NewConfiguredQueue(rootConfig, nil, true, nil)
assert.NilError(t, err, "failed to create queue: %v", err)
time.Sleep(time.Second)
noEvents := eventSystem.Store.CountStoredEvents()
@@ -2354,13 +2356,13 @@ func TestResetRunningState(t *testing.T) {
func TestNewRecoveryQueue(t *testing.T) {
var err error
- if _, err = NewRecoveryQueue(nil); err == nil {
+ if _, err = NewRecoveryQueue(nil, nil); err == nil {
t.Fatalf("recovery queue creation should fail with nil parent")
}
parent, err := createManagedQueueWithProps(nil, "parent", true, nil,
nil)
assert.NilError(t, err, "failed to create queue: %v", err)
- if _, err = NewRecoveryQueue(parent); err == nil {
+ if _, err = NewRecoveryQueue(parent, nil); err == nil {
t.Fatalf("recovery queue creation should fail with non-root
parent")
}
@@ -2370,9 +2372,9 @@ func TestNewRecoveryQueue(t *testing.T) {
Properties: map[string]string{configs.ApplicationSortPolicy:
"fair"},
ChildTemplate: configs.ChildTemplate{Properties:
map[string]string{configs.ApplicationSortPolicy: "fair"}},
}
- parent, err = NewConfiguredQueue(parentConfig, nil, false)
+ parent, err = NewConfiguredQueue(parentConfig, nil, false, nil)
assert.NilError(t, err, "failed to create queue: %v", err)
- recoveryQueue, err := NewRecoveryQueue(parent)
+ recoveryQueue, err := NewRecoveryQueue(parent, nil)
assert.NilError(t, err, "failed to create recovery queue: %v", err)
assert.Equal(t, common.RecoveryQueueFull, recoveryQueue.GetQueuePath(),
"wrong queue name")
assert.Equal(t, policies.FifoSortPolicy, recoveryQueue.getSortType(),
"wrong sort type")
@@ -2381,7 +2383,7 @@ func TestNewRecoveryQueue(t *testing.T) {
func TestNewDynamicQueueDoesNotCreateRecovery(t *testing.T) {
parent, err := createRootQueue(nil)
assert.NilError(t, err, "failed to create queue: %v", err)
- if _, err := NewDynamicQueue(common.RecoveryQueue, true, parent); err
== nil {
+ if _, err := NewDynamicQueue(common.RecoveryQueue, true, parent, nil);
err == nil {
t.Fatalf("invalid recovery queue %s was created",
common.RecoveryQueueFull)
}
}
@@ -2400,7 +2402,7 @@ func TestNewDynamicQueue(t *testing.T) {
assert.NilError(t, err)
// case 0: leaf can use template
- childLeaf, err := NewDynamicQueue("leaf", true, parent)
+ childLeaf, err := NewDynamicQueue("leaf", true, parent, nil)
assert.NilError(t, err, "failed to create dynamic queue: %v", err)
assert.Assert(t, childLeaf.template == nil)
assert.Equal(t, childLeaf.maxRunningApps,
parent.template.GetMaxApplications())
@@ -2412,7 +2414,7 @@ func TestNewDynamicQueue(t *testing.T) {
assert.Equal(t, childLeaf.preemptionPolicy,
policies.DefaultPreemptionPolicy)
// case 1: non-leaf can't use template but it can inherit template from
parent
- childNonLeaf, err :=
NewDynamicQueue("nonleaf_Test-a_b_#_c_#_d_/_e@dom:ain", false, parent)
+ childNonLeaf, err :=
NewDynamicQueue("nonleaf_Test-a_b_#_c_#_d_/_e@dom:ain", false, parent, nil)
assert.NilError(t, err, "failed to create dynamic queue: %v", err)
assert.Assert(t, reflect.DeepEqual(childNonLeaf.template,
parent.template))
assert.Equal(t, len(childNonLeaf.properties), 0)
@@ -2423,7 +2425,7 @@ func TestNewDynamicQueue(t *testing.T) {
assert.Equal(t, childNonLeaf.preemptionPolicy,
policies.DefaultPreemptionPolicy)
// case 2: invalid queue name
- _, err = NewDynamicQueue("invalid!queue", false, parent)
+ _, err = NewDynamicQueue("invalid!queue", false, parent, nil)
if err == nil {
t.Errorf("new dynamic queue should have failed to create, err
is %v", err)
}
diff --git a/pkg/scheduler/objects/utilities_test.go
b/pkg/scheduler/objects/utilities_test.go
index 2df468c8..b7ca66e7 100644
--- a/pkg/scheduler/objects/utilities_test.go
+++ b/pkg/scheduler/objects/utilities_test.go
@@ -65,25 +65,33 @@ func createRootQueue(maxRes map[string]string) (*Queue,
error) {
return createManagedQueueWithProps(nil, "root", true, maxRes, nil)
}
+func createRootQueueWithAppQueueMapping(maxRes map[string]string,
appQueueMapping *AppQueueMapping) (*Queue, error) {
+ return createManagedQueueWithAppQueueMapping(nil, "root", true, maxRes,
appQueueMapping)
+}
+
// wrapper around the create call using the one syntax for all queue types
func createManagedQueue(parentSQ *Queue, name string, parent bool, maxRes
map[string]string) (*Queue, error) {
return createManagedQueueWithProps(parentSQ, name, parent, maxRes, nil)
}
+func createManagedQueueWithAppQueueMapping(parentSQ *Queue, name string,
parent bool, maxRes map[string]string, appQueueMapping *AppQueueMapping)
(*Queue, error) {
+ return createManagedQueuePropsMaxApps(parentSQ, name, parent, maxRes,
nil, nil, uint64(0), appQueueMapping)
+}
+
// create managed queue with props set
func createManagedQueueWithProps(parentSQ *Queue, name string, parent bool,
maxRes, props map[string]string) (*Queue, error) {
- return createManagedQueuePropsMaxApps(parentSQ, name, parent, maxRes,
nil, props, uint64(0))
+ return createManagedQueuePropsMaxApps(parentSQ, name, parent, maxRes,
nil, props, uint64(0), nil)
}
func createManagedQueueMaxApps(parentSQ *Queue, name string, parent bool,
maxRes map[string]string, maxApps uint64) (*Queue, error) {
- return createManagedQueuePropsMaxApps(parentSQ, name, parent, maxRes,
nil, nil, maxApps)
+ return createManagedQueuePropsMaxApps(parentSQ, name, parent, maxRes,
nil, nil, maxApps, nil)
}
-func createManagedQueueGuaranteed(parentSQ *Queue, name string, parent bool,
maxRes, guarRes map[string]string) (*Queue, error) {
- return createManagedQueuePropsMaxApps(parentSQ, name, parent, maxRes,
guarRes, nil, uint64(0))
+func createManagedQueueGuaranteed(parentSQ *Queue, name string, parent bool,
maxRes, guarRes map[string]string, appQueueMapping *AppQueueMapping) (*Queue,
error) {
+ return createManagedQueuePropsMaxApps(parentSQ, name, parent, maxRes,
guarRes, nil, uint64(0), appQueueMapping)
}
-func createManagedQueuePropsMaxApps(parentSQ *Queue, name string, parent bool,
maxRes map[string]string, guarRes map[string]string, props map[string]string,
maxApps uint64) (*Queue, error) {
+func createManagedQueuePropsMaxApps(parentSQ *Queue, name string, parent bool,
maxRes map[string]string, guarRes map[string]string, props map[string]string,
maxApps uint64, appQueueMapping *AppQueueMapping) (*Queue, error) {
queueConfig := configs.QueueConfig{
Name: name,
Parent: parent,
@@ -97,7 +105,7 @@ func createManagedQueuePropsMaxApps(parentSQ *Queue, name
string, parent bool, m
Guaranteed: guarRes,
}
}
- queue, err := NewConfiguredQueue(queueConfig, parentSQ, false)
+ queue, err := NewConfiguredQueue(queueConfig, parentSQ, false,
appQueueMapping)
if err != nil {
return nil, err
}
@@ -118,8 +126,8 @@ func createManagedQueuePropsMaxApps(parentSQ *Queue, name
string, parent bool, m
// wrapper around the create call using the one syntax for all queue types
// NOTE: test code uses a flag for parent=true, dynamic queues use leaf flag
-func createDynamicQueue(parentSQ *Queue, name string, parent bool) (*Queue,
error) {
- return NewDynamicQueue(name, !parent, parentSQ)
+func createDynamicQueue(parentSQ *Queue, name string, parent bool,
appQueueMapping *AppQueueMapping) (*Queue, error) {
+ return NewDynamicQueue(name, !parent, parentSQ, appQueueMapping)
}
// Create application with minimal info
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index 8fb88419..62e40130 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -66,6 +66,7 @@ type PartitionContext struct {
placeholderAllocations int // number of
placeholder allocations
preemptionEnabled bool // whether
preemption is enabled or not
foreignAllocs map[string]*objects.Allocation // foreign
(non-Yunikorn) allocations
+ appQueueMapping *objects.AppQueueMapping // appID mapping
to queues
// The partition write lock must not be held while manipulating an
application.
// Scheduling is running continuously as a lock free background task.
Scheduling an application
@@ -96,6 +97,7 @@ func newPartitionContext(conf configs.PartitionConfig, rmID
string, cc *ClusterC
completedApplications: make(map[string]*objects.Application),
nodes: objects.NewNodeCollection(conf.Name),
foreignAllocs: make(map[string]*objects.Allocation),
+ appQueueMapping: objects.NewAppQueueMapping(),
}
pc.partitionManager = newPartitionManager(pc, cc)
if err := pc.initialPartitionFromConfig(conf, silence); err != nil {
@@ -116,7 +118,7 @@ func (pc *PartitionContext) initialPartitionFromConfig(conf
configs.PartitionCon
// Add the rest of the queue structure recursively
queueConf := conf.Queues[0]
var err error
- if pc.root, err = objects.NewConfiguredQueue(queueConf, nil, silence);
err != nil {
+ if pc.root, err = objects.NewConfiguredQueue(queueConf, nil, silence,
pc.appQueueMapping); err != nil {
return err
}
// recursively add the queues to the root
@@ -204,7 +206,7 @@ func (pc *PartitionContext) updatePartitionDetails(conf
configs.PartitionConfig)
func (pc *PartitionContext) addQueue(conf []configs.QueueConfig, parent
*objects.Queue, silence bool) error {
// create the queue at this level
for _, queueConf := range conf {
- thisQueue, err := objects.NewConfiguredQueue(queueConf, parent,
silence)
+ thisQueue, err := objects.NewConfiguredQueue(queueConf, parent,
silence, pc.appQueueMapping)
if err != nil {
return err
}
@@ -233,7 +235,7 @@ func (pc *PartitionContext) updateQueues(config
[]configs.QueueConfig, parent *o
queue := pc.getQueueInternal(pathName)
var err error
if queue == nil {
- queue, err = objects.NewConfiguredQueue(queueConfig,
parent, false)
+ queue, err = objects.NewConfiguredQueue(queueConfig,
parent, false, pc.appQueueMapping)
} else {
err = queue.ApplyConf(queueConfig)
}
@@ -386,6 +388,7 @@ func (pc *PartitionContext) AddApplication(app
*objects.Application) error {
app.SetTerminatedCallback(pc.moveTerminatedApp)
queue.AddApplication(app)
pc.applications[appID] = app
+ pc.appQueueMapping.AddAppQueueMapping(appID, queue)
return nil
}
@@ -404,6 +407,8 @@ func (pc *PartitionContext) removeApplication(appID string)
[]*objects.Allocatio
if queue := app.GetQueue(); queue != nil {
queue.RemoveApplication(app)
}
+ // Remove appID mapping
+ pc.appQueueMapping.RemoveAppQueueMapping(appID)
// Remove all allocations
allocations := app.RemoveAllAllocations()
// Remove all allocations from node(s) (queues have been updated
already)
@@ -509,7 +514,7 @@ func (pc *PartitionContext) GetPlacementRules()
[]*dao.RuleDAO {
// createRecoveryQueue creates the recovery queue to add to the hierarchy
func (pc *PartitionContext) createRecoveryQueue() (*objects.Queue, error) {
- return objects.NewRecoveryQueue(pc.root)
+ return objects.NewRecoveryQueue(pc.root, pc.appQueueMapping)
}
// Create a queue with full hierarchy. This is called when a new queue is
created from a placement rule.
@@ -543,7 +548,7 @@ func (pc *PartitionContext) createQueue(name string, user
security.UserGroup) (*
for i := len(toCreate) - 1; i >= 0; i-- {
// everything is checked and there should be no errors
var err error
- queue, err = objects.NewDynamicQueue(toCreate[i], i == 0, queue)
+ queue, err = objects.NewDynamicQueue(toCreate[i], i == 0,
queue, pc.appQueueMapping)
if err != nil {
log.Log(log.SchedPartition).Warn("Queue auto create
failed unexpected",
zap.String("queueName", toCreate[i]),
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index fd734c2f..9b8d0ba6 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -1530,7 +1530,7 @@ func TestGetQueue(t *testing.T) {
}
var parent *objects.Queue
// manually add the queue in below the root
- parent, err = objects.NewConfiguredQueue(parentConf, queue, false)
+ parent, err = objects.NewConfiguredQueue(parentConf, queue, false,
partition.appQueueMapping)
assert.NilError(t, err, "failed to create parent queue")
queue = partition.GetQueue("root.unknown")
assert.Equal(t, queue, nilQueue, "partition returned not nil for non
existing queue name request: %v", queue)
diff --git a/pkg/scheduler/placement/testrule_test.go
b/pkg/scheduler/placement/testrule_test.go
index 71be5721..cd4d2c74 100644
--- a/pkg/scheduler/placement/testrule_test.go
+++ b/pkg/scheduler/placement/testrule_test.go
@@ -78,7 +78,7 @@ func initQueueStructure(data []byte) error {
return err
}
rootConf := conf.Partitions[0].Queues[0]
- root, err = objects.NewConfiguredQueue(rootConf, nil, false)
+ root, err = objects.NewConfiguredQueue(rootConf, nil, false, nil)
if err != nil {
return err
}
@@ -87,7 +87,7 @@ func initQueueStructure(data []byte) error {
func addQueue(conf []configs.QueueConfig, parent *objects.Queue) error {
for _, queueConf := range conf {
- thisQueue, err := objects.NewConfiguredQueue(queueConf, parent,
false)
+ thisQueue, err := objects.NewConfiguredQueue(queueConf, parent,
false, nil)
if err != nil {
return err
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]