This is an automated email from the ASF dual-hosted git repository.
wilfreds pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 174c5fd8 [YUNIKORN-2116] Track user/group events (#800)
174c5fd8 is described below
commit 174c5fd8f1199c1f8a57e436dcc2bad8782e9723
Author: Peter Bacsko <[email protected]>
AuthorDate: Thu Feb 22 17:19:50 2024 +1100
[YUNIKORN-2116] Track user/group events (#800)
Add events t the event system for user and group quotas. Additional
events are generated for configuration changes and usage changes for
both the user and groups.
The (un)linking of and app to a group is tracked in separate events.
Closes: #800
Signed-off-by: Wilfred Spiegelenburg <[email protected]>
---
pkg/events/events.go | 4 +
pkg/events/events_test.go | 3 +
pkg/scheduler/objects/application.go | 42 ++++
pkg/scheduler/objects/application_events.go | 40 +++-
pkg/scheduler/objects/application_events_test.go | 92 +++++++++
pkg/scheduler/objects/application_test.go | 55 ++++++
pkg/scheduler/objects/queue.go | 5 +-
pkg/scheduler/tests/application_tracking_test.go | 62 +++---
pkg/scheduler/ugm/group_tracker.go | 28 ++-
pkg/scheduler/ugm/group_tracker_test.go | 137 +++++++++----
pkg/scheduler/ugm/manager.go | 68 +++----
pkg/scheduler/ugm/ugm_events.go | 116 +++++++++++
pkg/scheduler/ugm/ugm_events_test.go | 239 +++++++++++++++++++++++
pkg/scheduler/ugm/user_tracker.go | 41 +++-
pkg/scheduler/ugm/user_tracker_test.go | 164 ++++++++++++----
15 files changed, 942 insertions(+), 154 deletions(-)
diff --git a/pkg/events/events.go b/pkg/events/events.go
index be6f72b8..819dd661 100644
--- a/pkg/events/events.go
+++ b/pkg/events/events.go
@@ -54,3 +54,7 @@ func CreateNodeEventRecord(objectID, message, referenceID
string, changeType si.
func CreateQueueEventRecord(objectID, message, referenceID string, changeType
si.EventRecord_ChangeType, changeDetail si.EventRecord_ChangeDetail, resource
*resources.Resource) *si.EventRecord {
return createEventRecord(si.EventRecord_QUEUE, objectID, referenceID,
message, changeType, changeDetail, resource)
}
+
+func CreateUserGroupEventRecord(objectID, message, referenceID string,
changeType si.EventRecord_ChangeType, changeDetail si.EventRecord_ChangeDetail,
resource *resources.Resource) *si.EventRecord {
+ return createEventRecord(si.EventRecord_USERGROUP, objectID,
referenceID, message, changeType, changeDetail, resource)
+}
diff --git a/pkg/events/events_test.go b/pkg/events/events_test.go
index dc86fcd1..11bca877 100644
--- a/pkg/events/events_test.go
+++ b/pkg/events/events_test.go
@@ -58,4 +58,7 @@ func TestCreateEventRecordTypes(t *testing.T) {
record = CreateQueueEventRecord("queue", "message", "app",
si.EventRecord_NONE, si.EventRecord_DETAILS_NONE, nil)
assert.Equal(t, record.Type, si.EventRecord_QUEUE)
+
+ record = CreateUserGroupEventRecord("user", "message", "queue",
si.EventRecord_NONE, si.EventRecord_DETAILS_NONE, nil)
+ assert.Equal(t, record.Type, si.EventRecord_USERGROUP)
}
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index b8d8f60a..cde6fbd9 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -112,6 +112,8 @@ type Application struct {
placeholderData map[string]*PlaceholderData // track placeholder
and gang related info
askMaxPriority int32 // highest priority
value of outstanding asks
hasPlaceholderAlloc bool // Whether there is at
least one allocated placeholder
+ runnableInQueue bool // whether the
application is runnable/schedulable in the queue. Default is true.
+ runnableByUserLimit bool // whether the
application is runnable/schedulable based on user/group quota. Default is true.
rmEventHandler handler.EventHandler
rmID string
@@ -171,6 +173,8 @@ func NewApplication(siApp *si.AddApplicationRequest, ugi
security.UserGroup, eve
askMaxPriority: configs.MinPriority,
sortedRequests: sortedRequests{},
sendStateChangeEvents: true,
+ runnableByUserLimit: true,
+ runnableInQueue: true,
}
placeholderTimeout := common.ConvertSITimeoutWithAdjustment(siApp,
defaultPlaceholderTimeout)
gangSchedStyle := siApp.GetGangSchedulingStyle()
@@ -2083,3 +2087,41 @@ func getRateLimitedAppLog() *log.RateLimitedLogger {
})
return rateLimitedAppLog
}
+
+func (sa *Application) updateRunnableStatus(runnableInQueue,
runnableByUserLimit bool) {
+ sa.Lock()
+ defer sa.Unlock()
+ if sa.runnableInQueue != runnableInQueue {
+ if runnableInQueue {
+ log.Log(log.SchedApplication).Info("Application is now
runnable in queue",
+ zap.String("appID", sa.ApplicationID),
+ zap.String("queue", sa.queuePath))
+ sa.appEvents.sendAppRunnableInQueueEvent()
+ } else {
+ log.Log(log.SchedApplication).Info("Maximum number of
running applications reached the queue limit",
+ zap.String("appID", sa.ApplicationID),
+ zap.String("queue", sa.queuePath))
+ sa.appEvents.sendAppNotRunnableInQueueEvent()
+ }
+ }
+ sa.runnableInQueue = runnableInQueue
+
+ if sa.runnableByUserLimit != runnableByUserLimit {
+ if runnableByUserLimit {
+ log.Log(log.SchedApplication).Info("Application is now
runnable based on user/group quota",
+ zap.String("appID", sa.ApplicationID),
+ zap.String("queue", sa.queuePath),
+ zap.String("user", sa.user.User),
+ zap.Strings("groups", sa.user.Groups))
+ sa.appEvents.sendAppRunnableQuotaEvent()
+ } else {
+ log.Log(log.SchedApplication).Info("Maximum number of
running applications reached the user/group limit",
+ zap.String("appID", sa.ApplicationID),
+ zap.String("queue", sa.queuePath),
+ zap.String("user", sa.user.User),
+ zap.Strings("groups", sa.user.Groups))
+ sa.appEvents.sendAppNotRunnableQuotaEvent()
+ }
+ }
+ sa.runnableByUserLimit = runnableByUserLimit
+}
diff --git a/pkg/scheduler/objects/application_events.go
b/pkg/scheduler/objects/application_events.go
index 58a24ab9..0f0aa5d6 100644
--- a/pkg/scheduler/objects/application_events.go
+++ b/pkg/scheduler/objects/application_events.go
@@ -83,7 +83,7 @@ func (evt *applicationEvents) sendRemoveAskEvent(request
*AllocationAsk, detail
if !evt.eventSystem.IsEventTrackingEnabled() {
return
}
- event := events.CreateAppEventRecord(evt.app.ApplicationID, "",
request.GetAllocationKey(), si.EventRecord_REMOVE, detail,
request.GetAllocatedResource())
+ event := events.CreateAppEventRecord(evt.app.ApplicationID,
common.Empty, request.GetAllocationKey(), si.EventRecord_REMOVE, detail,
request.GetAllocatedResource())
evt.eventSystem.AddEvent(event)
}
@@ -91,7 +91,7 @@ func (evt *applicationEvents) sendNewApplicationEvent() {
if !evt.eventSystem.IsEventTrackingEnabled() {
return
}
- event := events.CreateAppEventRecord(evt.app.ApplicationID, "", "",
si.EventRecord_ADD, si.EventRecord_DETAILS_NONE, evt.app.allocatedResource)
+ event := events.CreateAppEventRecord(evt.app.ApplicationID,
common.Empty, common.Empty, si.EventRecord_ADD, si.EventRecord_DETAILS_NONE,
evt.app.allocatedResource)
evt.eventSystem.AddEvent(event)
}
@@ -99,7 +99,7 @@ func (evt *applicationEvents) sendRemoveApplicationEvent() {
if !evt.eventSystem.IsEventTrackingEnabled() {
return
}
- event := events.CreateAppEventRecord(evt.app.ApplicationID, "", "",
si.EventRecord_REMOVE, si.EventRecord_DETAILS_NONE, evt.app.allocatedResource)
+ event := events.CreateAppEventRecord(evt.app.ApplicationID,
common.Empty, common.Empty, si.EventRecord_REMOVE, si.EventRecord_DETAILS_NONE,
evt.app.allocatedResource)
evt.eventSystem.AddEvent(event)
}
@@ -107,7 +107,39 @@ func (evt *applicationEvents)
sendStateChangeEvent(changeDetail si.EventRecord_C
if !evt.eventSystem.IsEventTrackingEnabled() ||
!evt.app.sendStateChangeEvents {
return
}
- event := events.CreateAppEventRecord(evt.app.ApplicationID, eventInfo,
"", si.EventRecord_SET, changeDetail, evt.app.allocatedResource)
+ event := events.CreateAppEventRecord(evt.app.ApplicationID, eventInfo,
common.Empty, si.EventRecord_SET, changeDetail, evt.app.allocatedResource)
+ evt.eventSystem.AddEvent(event)
+}
+
+func (evt *applicationEvents) sendAppNotRunnableInQueueEvent() {
+ if !evt.eventSystem.IsEventTrackingEnabled() {
+ return
+ }
+ event := events.CreateAppEventRecord(evt.app.ApplicationID,
common.Empty, common.Empty, si.EventRecord_NONE,
si.EventRecord_APP_CANNOTRUN_QUEUE, nil)
+ evt.eventSystem.AddEvent(event)
+}
+
+func (evt *applicationEvents) sendAppRunnableInQueueEvent() {
+ if !evt.eventSystem.IsEventTrackingEnabled() {
+ return
+ }
+ event := events.CreateAppEventRecord(evt.app.ApplicationID,
common.Empty, common.Empty, si.EventRecord_NONE,
si.EventRecord_APP_RUNNABLE_QUEUE, nil)
+ evt.eventSystem.AddEvent(event)
+}
+
+func (evt *applicationEvents) sendAppNotRunnableQuotaEvent() {
+ if !evt.eventSystem.IsEventTrackingEnabled() {
+ return
+ }
+ event := events.CreateAppEventRecord(evt.app.ApplicationID,
common.Empty, common.Empty, si.EventRecord_NONE,
si.EventRecord_APP_CANNOTRUN_QUOTA, nil)
+ evt.eventSystem.AddEvent(event)
+}
+
+func (evt *applicationEvents) sendAppRunnableQuotaEvent() {
+ if !evt.eventSystem.IsEventTrackingEnabled() {
+ return
+ }
+ event := events.CreateAppEventRecord(evt.app.ApplicationID,
common.Empty, common.Empty, si.EventRecord_NONE,
si.EventRecord_APP_RUNNABLE_QUOTA, nil)
evt.eventSystem.AddEvent(event)
}
diff --git a/pkg/scheduler/objects/application_events_test.go
b/pkg/scheduler/objects/application_events_test.go
index 7e29f6ae..1dd01c4a 100644
--- a/pkg/scheduler/objects/application_events_test.go
+++ b/pkg/scheduler/objects/application_events_test.go
@@ -343,3 +343,95 @@ func TestSendStateChangeEvent(t *testing.T) {
assert.Equal(t, "", event.ReferenceID)
assert.Equal(t, "Failed to add application to partition (placement
rejected)", event.Message)
}
+
+func TestSendAppCannotRunInQueueEvent(t *testing.T) {
+ app := &Application{
+ ApplicationID: appID0,
+ queuePath: "root.test",
+ sendStateChangeEvents: true,
+ }
+ eventSystem := mock.NewEventSystemDisabled()
+ appEvents := newApplicationEvents(app, eventSystem)
+ appEvents.sendAppNotRunnableInQueueEvent()
+ assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
+
+ eventSystem = mock.NewEventSystem()
+ appEvents = newApplicationEvents(app, eventSystem)
+ appEvents.sendAppNotRunnableInQueueEvent()
+ event := eventSystem.Events[0]
+ assert.Equal(t, si.EventRecord_APP, event.Type)
+ assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
+ assert.Equal(t, si.EventRecord_APP_CANNOTRUN_QUEUE,
event.EventChangeDetail)
+ assert.Equal(t, "app-0", event.ObjectID)
+ assert.Equal(t, "", event.ReferenceID)
+ assert.Equal(t, "", event.Message)
+}
+
+func TestSendAppCannotRunByQuotaEvent(t *testing.T) {
+ app := &Application{
+ ApplicationID: appID0,
+ queuePath: "root.test",
+ sendStateChangeEvents: true,
+ }
+ eventSystem := mock.NewEventSystemDisabled()
+ appEvents := newApplicationEvents(app, eventSystem)
+ appEvents.sendAppNotRunnableQuotaEvent()
+ assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
+
+ eventSystem = mock.NewEventSystem()
+ appEvents = newApplicationEvents(app, eventSystem)
+ appEvents.sendAppNotRunnableQuotaEvent()
+ event := eventSystem.Events[0]
+ assert.Equal(t, si.EventRecord_APP, event.Type)
+ assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
+ assert.Equal(t, si.EventRecord_APP_CANNOTRUN_QUOTA,
event.EventChangeDetail)
+ assert.Equal(t, "app-0", event.ObjectID)
+ assert.Equal(t, "", event.ReferenceID)
+ assert.Equal(t, "", event.Message)
+}
+
+func TestSendAppRunnableInQueueEvent(t *testing.T) {
+ app := &Application{
+ ApplicationID: appID0,
+ queuePath: "root.test",
+ sendStateChangeEvents: true,
+ }
+ eventSystem := mock.NewEventSystemDisabled()
+ appEvents := newApplicationEvents(app, eventSystem)
+ appEvents.sendAppRunnableInQueueEvent()
+ assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
+
+ eventSystem = mock.NewEventSystem()
+ appEvents = newApplicationEvents(app, eventSystem)
+ appEvents.sendAppRunnableInQueueEvent()
+ event := eventSystem.Events[0]
+ assert.Equal(t, si.EventRecord_APP, event.Type)
+ assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
+ assert.Equal(t, si.EventRecord_APP_RUNNABLE_QUEUE,
event.EventChangeDetail)
+ assert.Equal(t, "app-0", event.ObjectID)
+ assert.Equal(t, "", event.ReferenceID)
+ assert.Equal(t, "", event.Message)
+}
+
+func TestSendAppRunnableByQuotaEvent(t *testing.T) {
+ app := &Application{
+ ApplicationID: appID0,
+ queuePath: "root.test",
+ sendStateChangeEvents: true,
+ }
+ eventSystem := mock.NewEventSystemDisabled()
+ appEvents := newApplicationEvents(app, eventSystem)
+ appEvents.sendAppRunnableQuotaEvent()
+ assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
+
+ eventSystem = mock.NewEventSystem()
+ appEvents = newApplicationEvents(app, eventSystem)
+ appEvents.sendAppRunnableQuotaEvent()
+ event := eventSystem.Events[0]
+ assert.Equal(t, si.EventRecord_APP, event.Type)
+ assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
+ assert.Equal(t, si.EventRecord_APP_RUNNABLE_QUOTA,
event.EventChangeDetail)
+ assert.Equal(t, "app-0", event.ObjectID)
+ assert.Equal(t, "", event.ReferenceID)
+ assert.Equal(t, "", event.Message)
+}
diff --git a/pkg/scheduler/objects/application_test.go
b/pkg/scheduler/objects/application_test.go
index 3136d06a..bc055ee6 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -2601,6 +2601,61 @@ func TestGetRateLimitedAppLog(t *testing.T) {
assert.Check(t, l != nil)
}
+func TestUpdateRunnableStatus(t *testing.T) {
+ app := newApplication(appID0, "default", "root.unknown")
+ assert.Assert(t, app.runnableInQueue)
+ assert.Assert(t, app.runnableByUserLimit)
+ eventSystem := mock.NewEventSystem()
+ app.appEvents = newApplicationEvents(app, eventSystem)
+
+ // App runnable - no events
+ app.updateRunnableStatus(true, true)
+ assert.Equal(t, 0, len(eventSystem.Events))
+
+ // App not runnable in queue
+ eventSystem.Reset()
+ app.updateRunnableStatus(false, true)
+ assert.Equal(t, 1, len(eventSystem.Events))
+ assert.Equal(t, si.EventRecord_APP_CANNOTRUN_QUEUE,
eventSystem.Events[0].EventChangeDetail)
+ // Try again - no new events
+ app.updateRunnableStatus(false, true)
+ assert.Equal(t, 1, len(eventSystem.Events))
+
+ // App becomes runnable in queue
+ eventSystem.Reset()
+ app.updateRunnableStatus(true, true)
+ assert.Equal(t, 1, len(eventSystem.Events))
+ assert.Equal(t, si.EventRecord_APP_RUNNABLE_QUEUE,
eventSystem.Events[0].EventChangeDetail)
+
+ // Try again - no new events
+ app.updateRunnableStatus(true, true)
+ assert.Equal(t, 1, len(eventSystem.Events))
+
+ // App not runnable by UG quota
+ eventSystem.Reset()
+ app.updateRunnableStatus(true, false)
+ assert.Equal(t, 1, len(eventSystem.Events))
+ assert.Equal(t, si.EventRecord_APP_CANNOTRUN_QUOTA,
eventSystem.Events[0].EventChangeDetail)
+ // Try again - no new events
+ app.updateRunnableStatus(true, false)
+ assert.Equal(t, 1, len(eventSystem.Events))
+
+ // App becomes runnable by user quota
+ eventSystem.Reset()
+ app.updateRunnableStatus(true, true)
+ assert.Equal(t, si.EventRecord_APP_RUNNABLE_QUOTA,
eventSystem.Events[0].EventChangeDetail)
+ // Try again - no new events
+ app.updateRunnableStatus(true, true)
+ assert.Equal(t, 1, len(eventSystem.Events))
+
+ // Both false
+ eventSystem.Reset()
+ app.updateRunnableStatus(false, false)
+ assert.Equal(t, 2, len(eventSystem.Events))
+ assert.Equal(t, si.EventRecord_APP_CANNOTRUN_QUEUE,
eventSystem.Events[0].EventChangeDetail)
+ assert.Equal(t, si.EventRecord_APP_CANNOTRUN_QUOTA,
eventSystem.Events[1].EventChangeDetail)
+}
+
func (sa *Application) addPlaceholderDataWithLocking(ask *AllocationAsk) {
sa.Lock()
defer sa.Unlock()
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index 43e0b4ed..ebf70e29 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -1355,7 +1355,10 @@ func (sq *Queue) TryAllocate(iterator func()
NodeIterator, fullIterator func() N
// process the apps (filters out app without pending requests)
for _, app := range sq.sortApplications(true, false) {
- if app.IsAccepted() &&
(!sq.canRunApp(app.ApplicationID) ||
!ugm.GetUserManager().CanRunApp(sq.QueuePath, app.ApplicationID, app.user)) {
+ runnableInQueue := sq.canRunApp(app.ApplicationID)
+ runnableByUserLimit :=
ugm.GetUserManager().CanRunApp(sq.QueuePath, app.ApplicationID, app.user)
+ app.updateRunnableStatus(runnableInQueue,
runnableByUserLimit)
+ if app.IsAccepted() && (!runnableInQueue ||
!runnableByUserLimit) {
continue
}
alloc := app.tryAllocate(headRoom, allowPreemption,
preemptionDelay, &preemptAttemptsRemaining, iterator, fullIterator, getnode)
diff --git a/pkg/scheduler/tests/application_tracking_test.go
b/pkg/scheduler/tests/application_tracking_test.go
index 9f38b2d5..4f20cfd3 100644
--- a/pkg/scheduler/tests/application_tracking_test.go
+++ b/pkg/scheduler/tests/application_tracking_test.go
@@ -126,9 +126,9 @@ func TestApplicationHistoryTracking(t *testing.T) {
ms.mockRM.waitForAllocations(t, 1, 1000)
eventsDao, err = client.GetBatchEvents()
assert.NilError(t, err)
- assert.Equal(t, 12, len(eventsDao.EventRecords), "number of events
generated")
+ assert.Equal(t, 13, len(eventsDao.EventRecords), "number of events
generated")
verifyAllocationAskAddedEvents(t, eventsDao.EventRecords[7:])
- events = getEventsFromStream(t, stream, 5)
+ events = getEventsFromStream(t, stream, 6)
verifyAllocationAskAddedEvents(t, events)
allocations := ms.mockRM.getAllocations()
@@ -163,9 +163,9 @@ func TestApplicationHistoryTracking(t *testing.T) {
eventsDao, err = client.GetBatchEvents()
assert.NilError(t, err)
- assert.Equal(t, 15, len(eventsDao.EventRecords), "number of events
generated")
- verifyAllocationCancelledEvents(t, eventsDao.EventRecords[12:])
- events = getEventsFromStream(t, stream, 3)
+ assert.Equal(t, 17, len(eventsDao.EventRecords), "number of events
generated")
+ verifyAllocationCancelledEvents(t, eventsDao.EventRecords[13:])
+ events = getEventsFromStream(t, stream, 4)
assert.NilError(t, err)
verifyAllocationCancelledEvents(t, events)
}
@@ -314,35 +314,51 @@ func verifyAllocationAskAddedEvents(t *testing.T, events
[]*si.EventRecord) {
assert.Equal(t, si.EventRecord_SET, events[3].EventChangeType)
assert.Equal(t, si.EventRecord_APP_STARTING,
events[3].EventChangeDetail)
- // adding allocation to the App
- assert.Equal(t, "app-1", events[4].ObjectID)
+ // Track resource usage for the user - increment
+ assert.Equal(t, "testuser", events[4].ObjectID)
assert.Equal(t, "", events[4].Message)
- assert.Equal(t, si.EventRecord_APP, events[4].Type)
+ assert.Equal(t, "root.singleleaf", events[4].ReferenceID)
+ assert.Equal(t, si.EventRecord_USERGROUP, events[4].Type)
assert.Equal(t, si.EventRecord_ADD, events[4].EventChangeType)
- assert.Equal(t, si.EventRecord_APP_ALLOC, events[4].EventChangeDetail)
+ assert.Equal(t, si.EventRecord_UG_USER_RESOURCE,
events[4].EventChangeDetail)
+
+ // adding allocation to the App
+ assert.Equal(t, "app-1", events[5].ObjectID)
+ assert.Equal(t, "", events[5].Message)
+ assert.Equal(t, si.EventRecord_APP, events[5].Type)
+ assert.Equal(t, si.EventRecord_ADD, events[5].EventChangeType)
+ assert.Equal(t, si.EventRecord_APP_ALLOC, events[5].EventChangeDetail)
}
func verifyAllocationCancelledEvents(t *testing.T, events []*si.EventRecord) {
- // state transition to Completing
- assert.Equal(t, "app-1", events[0].ObjectID)
+ // Track resource usage for the user - decrement
+ assert.Equal(t, "testuser", events[0].ObjectID)
assert.Equal(t, "", events[0].Message)
- assert.Equal(t, "", events[0].ReferenceID)
- assert.Equal(t, si.EventRecord_APP, events[0].Type)
- assert.Equal(t, si.EventRecord_SET, events[0].EventChangeType)
- assert.Equal(t, si.EventRecord_APP_COMPLETING,
events[0].EventChangeDetail)
+ assert.Equal(t, "root.singleleaf", events[0].ReferenceID)
+ assert.Equal(t, si.EventRecord_USERGROUP, events[0].Type)
+ assert.Equal(t, si.EventRecord_REMOVE, events[0].EventChangeType)
+ assert.Equal(t, si.EventRecord_UG_USER_RESOURCE,
events[0].EventChangeDetail)
- // cancel allocation
+ // state transition to Completing
assert.Equal(t, "app-1", events[1].ObjectID)
assert.Equal(t, "", events[1].Message)
+ assert.Equal(t, "", events[1].ReferenceID)
assert.Equal(t, si.EventRecord_APP, events[1].Type)
- assert.Equal(t, si.EventRecord_REMOVE, events[1].EventChangeType)
- assert.Equal(t, si.EventRecord_ALLOC_CANCEL,
events[1].EventChangeDetail)
+ assert.Equal(t, si.EventRecord_SET, events[1].EventChangeType)
+ assert.Equal(t, si.EventRecord_APP_COMPLETING,
events[1].EventChangeDetail)
- // remove allocation from the node
- assert.Equal(t, "node-1:1234", events[2].ObjectID)
+ // cancel allocation
+ assert.Equal(t, "app-1", events[2].ObjectID)
assert.Equal(t, "", events[2].Message)
- assert.Equal(t, "alloc-1", events[2].ReferenceID)
- assert.Equal(t, si.EventRecord_NODE, events[2].Type)
+ assert.Equal(t, si.EventRecord_APP, events[2].Type)
assert.Equal(t, si.EventRecord_REMOVE, events[2].EventChangeType)
- assert.Equal(t, si.EventRecord_NODE_ALLOC, events[2].EventChangeDetail)
+ assert.Equal(t, si.EventRecord_ALLOC_CANCEL,
events[2].EventChangeDetail)
+
+ // remove allocation from the node
+ assert.Equal(t, "node-1:1234", events[3].ObjectID)
+ assert.Equal(t, "", events[3].Message)
+ assert.Equal(t, "alloc-1", events[3].ReferenceID)
+ assert.Equal(t, si.EventRecord_NODE, events[3].Type)
+ assert.Equal(t, si.EventRecord_REMOVE, events[3].EventChangeType)
+ assert.Equal(t, si.EventRecord_NODE_ALLOC, events[3].EventChangeDetail)
}
diff --git a/pkg/scheduler/ugm/group_tracker.go
b/pkg/scheduler/ugm/group_tracker.go
index 255eeafa..fc94febd 100644
--- a/pkg/scheduler/ugm/group_tracker.go
+++ b/pkg/scheduler/ugm/group_tracker.go
@@ -19,9 +19,11 @@
package ugm
import (
+ "strings"
"sync"
"github.com/apache/yunikorn-core/pkg/common"
+ "github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/webservice/dao"
)
@@ -30,40 +32,44 @@ type GroupTracker struct {
groupName string // Name of the group for which usage is
being tracked upon
applications map[string]string // Hold applications currently run by
all users belong to this group
queueTracker *QueueTracker // Holds the actual resource usage of
queue path where application run
+ events *ugmEvents
sync.RWMutex
}
-func newGroupTracker(groupName string) *GroupTracker {
+func newGroupTracker(groupName string, events *ugmEvents) *GroupTracker {
queueTracker := newRootQueueTracker(group)
groupTracker := &GroupTracker{
groupName: groupName,
applications: make(map[string]string),
queueTracker: queueTracker,
+ events: events,
}
return groupTracker
}
-func (gt *GroupTracker) increaseTrackedResource(hierarchy []string,
applicationID string, usage *resources.Resource, user string) bool {
+func (gt *GroupTracker) increaseTrackedResource(queuePath, applicationID
string, usage *resources.Resource, user string) bool {
if gt == nil {
return true
}
gt.Lock()
defer gt.Unlock()
+ gt.events.sendIncResourceUsageForGroup(gt.groupName, queuePath, usage)
gt.applications[applicationID] = user
- return gt.queueTracker.increaseTrackedResource(hierarchy,
applicationID, group, usage)
+ return gt.queueTracker.increaseTrackedResource(strings.Split(queuePath,
configs.DOT), applicationID, group, usage)
}
-func (gt *GroupTracker) decreaseTrackedResource(hierarchy []string,
applicationID string, usage *resources.Resource, removeApp bool) (bool, bool) {
+func (gt *GroupTracker) decreaseTrackedResource(queuePath, applicationID
string, usage *resources.Resource, removeApp bool) (bool, bool) {
if gt == nil {
return false, true
}
gt.Lock()
defer gt.Unlock()
+ gt.events.sendDecResourceUsageForGroup(gt.groupName, queuePath, usage)
if removeApp {
delete(gt.applications, applicationID)
}
- return gt.queueTracker.decreaseTrackedResource(hierarchy,
applicationID, usage, removeApp)
+ return gt.queueTracker.decreaseTrackedResource(strings.Split(queuePath,
configs.DOT), applicationID, usage, removeApp)
}
func (gt *GroupTracker) getTrackedApplications() map[string]string {
@@ -72,10 +78,18 @@ func (gt *GroupTracker) getTrackedApplications()
map[string]string {
return gt.applications
}
-func (gt *GroupTracker) setLimits(hierarchy []string, resource
*resources.Resource, maxApps uint64) {
+func (gt *GroupTracker) setLimits(queuePath string, resource
*resources.Resource, maxApps uint64) {
gt.Lock()
defer gt.Unlock()
- gt.queueTracker.setLimit(hierarchy, resource, maxApps, false, group,
false)
+ gt.events.sendLimitSetForGroup(gt.groupName, queuePath)
+ gt.queueTracker.setLimit(strings.Split(queuePath, configs.DOT),
resource, maxApps, false, group, false)
+}
+
+func (gt *GroupTracker) clearLimits(queuePath string) {
+ gt.Lock()
+ defer gt.Unlock()
+ gt.events.sendLimitRemoveForGroup(gt.groupName, queuePath)
+ gt.queueTracker.setLimit(strings.Split(queuePath, configs.DOT), nil, 0,
false, group, false)
}
func (gt *GroupTracker) headroom(hierarchy []string) *resources.Resource {
diff --git a/pkg/scheduler/ugm/group_tracker_test.go
b/pkg/scheduler/ugm/group_tracker_test.go
index a9998ab2..08dca63f 100644
--- a/pkg/scheduler/ugm/group_tracker_test.go
+++ b/pkg/scheduler/ugm/group_tracker_test.go
@@ -19,12 +19,16 @@
package ugm
import (
+ "strings"
"testing"
"gotest.tools/v3/assert"
+ "github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/common/security"
+ "github.com/apache/yunikorn-core/pkg/events/mock"
+ "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
func TestGTIncreaseTrackedResource(t *testing.T) {
@@ -34,7 +38,8 @@ func TestGTIncreaseTrackedResource(t *testing.T) {
// root->parent->child12 (similar name like above leaf queue, but it is
being treated differently as similar names are allowed)
manager := GetUserManager()
user := &security.UserGroup{User: "test", Groups: []string{"test"}}
- groupTracker := newGroupTracker(user.User)
+ eventSystem := mock.NewEventSystem()
+ groupTracker := newGroupTracker(user.User, newUGMEvents(eventSystem))
usage1, err := resources.NewResourceFromConf(map[string]string{"mem":
"10M", "vcore": "10"})
if err != nil {
@@ -42,36 +47,39 @@ func TestGTIncreaseTrackedResource(t *testing.T) {
}
manager.Headroom(queuePath1, TestApp1, *user)
- result := groupTracker.increaseTrackedResource(hierarchy1, TestApp1,
usage1, user.User)
+ result := groupTracker.increaseTrackedResource(path1, TestApp1, usage1,
user.User)
if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %+q,
app %s, res %v", hierarchy1, TestApp1, usage1)
+ t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", path1, TestApp1, usage1)
}
+ assert.Equal(t, 1, len(eventSystem.Events))
+ assert.Equal(t, si.EventRecord_UG_GROUP_RESOURCE,
eventSystem.Events[0].EventChangeDetail)
+ assert.Equal(t, si.EventRecord_ADD,
eventSystem.Events[0].EventChangeType)
usage2, err := resources.NewResourceFromConf(map[string]string{"mem":
"20M", "vcore": "20"})
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage2)
}
- result = groupTracker.increaseTrackedResource(hierarchy2, TestApp2,
usage2, user.User)
+ result = groupTracker.increaseTrackedResource(path2, TestApp2, usage2,
user.User)
if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %+q,
app %s, res %v", hierarchy2, TestApp2, usage2)
+ t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", path2, TestApp2, usage2)
}
usage3, err := resources.NewResourceFromConf(map[string]string{"mem":
"30M", "vcore": "30"})
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage3)
}
- result = groupTracker.increaseTrackedResource(hierarchy3, TestApp3,
usage3, user.User)
+ result = groupTracker.increaseTrackedResource(path3, TestApp3, usage3,
user.User)
if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %+q,
app %s, res %v", hierarchy3, TestApp3, usage3)
+ t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", path3, TestApp3, usage3)
}
usage4, err := resources.NewResourceFromConf(map[string]string{"mem":
"20M", "vcore": "20"})
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage3)
}
- result = groupTracker.increaseTrackedResource(hierarchy4, TestApp4,
usage4, user.User)
+ result = groupTracker.increaseTrackedResource(path4, TestApp4, usage4,
user.User)
if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %+q,
app %s, res %v", hierarchy4, TestApp4, usage4)
+ t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", path4, TestApp4, usage4)
}
actualResources := getGroupResource(groupTracker)
@@ -91,14 +99,15 @@ func TestGTDecreaseTrackedResource(t *testing.T) {
// Initialize ugm
GetUserManager()
user := &security.UserGroup{User: "test", Groups: []string{"test"}}
- groupTracker := newGroupTracker(user.User)
+ eventSystem := mock.NewEventSystem()
+ groupTracker := newGroupTracker(user.User, newUGMEvents(eventSystem))
usage1, err := resources.NewResourceFromConf(map[string]string{"mem":
"70M", "vcore": "70"})
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage1)
}
- result := groupTracker.increaseTrackedResource(hierarchy1, TestApp1,
usage1, user.User)
+ result := groupTracker.increaseTrackedResource(path1, TestApp1, usage1,
user.User)
if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %+q,
app %s, res %v", hierarchy1, TestApp1, usage1)
+ t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", path1, TestApp1, usage1)
}
assert.Equal(t, 1, len(groupTracker.getTrackedApplications()))
@@ -106,9 +115,9 @@ func TestGTDecreaseTrackedResource(t *testing.T) {
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage2)
}
- result = groupTracker.increaseTrackedResource(hierarchy2, TestApp2,
usage2, user.User)
+ result = groupTracker.increaseTrackedResource(path2, TestApp2, usage2,
user.User)
if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %+q,
app %s, res %v", hierarchy2, TestApp2, usage2)
+ t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", path2, TestApp2, usage2)
}
actualResources := getGroupResource(groupTracker)
@@ -122,15 +131,19 @@ func TestGTDecreaseTrackedResource(t *testing.T) {
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage3)
}
- removeQT, decreased := groupTracker.decreaseTrackedResource(hierarchy1,
TestApp1, usage3, false)
+ eventSystem.Reset()
+ removeQT, decreased := groupTracker.decreaseTrackedResource(path1,
TestApp1, usage3, false)
if !decreased {
- t.Fatalf("unable to decrease tracked resource: queuepath %+q,
app %s, res %v, error %t", hierarchy1, TestApp1, usage3, err)
+ t.Fatalf("unable to decrease tracked resource: queuepath %s,
app %s, res %v, error %t", path1, TestApp1, usage3, err)
}
assert.Equal(t, removeQT, false, "wrong remove queue tracker value")
+ assert.Equal(t, 1, len(eventSystem.Events))
+ assert.Equal(t, si.EventRecord_UG_GROUP_RESOURCE,
eventSystem.Events[0].EventChangeDetail)
+ assert.Equal(t, si.EventRecord_REMOVE,
eventSystem.Events[0].EventChangeType)
- removeQT, decreased = groupTracker.decreaseTrackedResource(hierarchy2,
TestApp2, usage3, false)
+ removeQT, decreased = groupTracker.decreaseTrackedResource(path2,
TestApp2, usage3, false)
if !decreased {
- t.Fatalf("unable to decrease tracked resource: queuepath %+q,
app %s, res %v, error %t", hierarchy2, TestApp2, usage3, err)
+ t.Fatalf("unable to decrease tracked resource: queuepath %s,
app %s, res %v, error %t", path2, TestApp2, usage3, err)
}
assert.Equal(t, removeQT, false, "wrong remove queue tracker value")
@@ -146,9 +159,9 @@ func TestGTDecreaseTrackedResource(t *testing.T) {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage3)
}
- removeQT, decreased = groupTracker.decreaseTrackedResource(hierarchy1,
TestApp1, usage4, true)
+ removeQT, decreased = groupTracker.decreaseTrackedResource(path1,
TestApp1, usage4, true)
if !decreased {
- t.Fatalf("unable to decrease tracked resource: queuepath %+q,
app %s, res %v, error %t", hierarchy1, TestApp1, usage1, err)
+ t.Fatalf("unable to decrease tracked resource: queuepath %s,
app %s, res %v, error %t", path1, TestApp1, usage1, err)
}
assert.Equal(t, 1, len(groupTracker.getTrackedApplications()))
assert.Equal(t, removeQT, false, "wrong remove queue tracker value")
@@ -158,44 +171,98 @@ func TestGTDecreaseTrackedResource(t *testing.T) {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage3)
}
- removeQT, decreased = groupTracker.decreaseTrackedResource(hierarchy2,
TestApp2, usage5, true)
+ removeQT, decreased = groupTracker.decreaseTrackedResource(path2,
TestApp2, usage5, true)
if !decreased {
- t.Fatalf("unable to decrease tracked resource: queuepath %+q,
app %s, res %v, error %t", hierarchy2, TestApp2, usage2, err)
+ t.Fatalf("unable to decrease tracked resource: queuepath %s,
app %s, res %v, error %t", path2, TestApp2, usage2, err)
}
assert.Equal(t, 0, len(groupTracker.getTrackedApplications()))
assert.Equal(t, removeQT, true, "wrong remove queue tracker value")
}
-func TestGTSetMaxLimits(t *testing.T) {
+func TestGTSetAndClearMaxLimits(t *testing.T) {
// Queue setup:
// root->parent->child1
// Initialize ugm
GetUserManager()
user := security.UserGroup{User: "test", Groups: []string{"test"}}
- groupTracker := newGroupTracker(user.User)
+ eventSystem := mock.NewEventSystem()
+ groupTracker := newGroupTracker(user.User, newUGMEvents(eventSystem))
usage1, err := resources.NewResourceFromConf(map[string]string{"mem":
"10M", "vcore": "10"})
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage1)
}
- result := groupTracker.increaseTrackedResource(hierarchy1, TestApp1,
usage1, user.User)
+ result := groupTracker.increaseTrackedResource(path1, TestApp1, usage1,
user.User)
if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %+q,
app %s, res %v", hierarchy1, TestApp1, usage1)
+ t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", path1, TestApp1, usage1)
}
- groupTracker.setLimits(hierarchy1, resources.Multiply(usage1, 5), 5)
- groupTracker.setLimits(hierarchy5, resources.Multiply(usage1, 10), 10)
-
- result = groupTracker.increaseTrackedResource(hierarchy1, TestApp2,
usage1, user.User)
+ // higher limits - apps can run
+ eventSystem.Reset()
+ groupTracker.setLimits(path1, resources.Multiply(usage1, 5), 5)
+ groupTracker.setLimits(path5, resources.Multiply(usage1, 10), 10)
+ assert.Equal(t, 2, len(eventSystem.Events))
+ assert.Equal(t, si.EventRecord_UG_GROUP_LIMIT,
eventSystem.Events[0].EventChangeDetail)
+ assert.Equal(t, si.EventRecord_SET,
eventSystem.Events[0].EventChangeType)
+ assert.Equal(t, path1, eventSystem.Events[0].ReferenceID)
+ assert.Equal(t, si.EventRecord_UG_GROUP_LIMIT,
eventSystem.Events[1].EventChangeDetail)
+ assert.Equal(t, si.EventRecord_SET,
eventSystem.Events[1].EventChangeType)
+ assert.Equal(t, path5, eventSystem.Events[1].ReferenceID)
+ result = groupTracker.increaseTrackedResource(path1, TestApp2, usage1,
user.User)
if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %+q,
app %s, res %v", hierarchy1, TestApp2, usage1)
+ t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", path1, TestApp2, usage1)
}
- result = groupTracker.increaseTrackedResource(hierarchy1, TestApp3,
usage1, user.User)
+ result = groupTracker.increaseTrackedResource(path1, TestApp3, usage1,
user.User)
if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %+q,
app %s, res %v", hierarchy1, TestApp3, usage1)
+ t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", path1, TestApp3, usage1)
}
- groupTracker.setLimits(hierarchy1, usage1, 1)
- groupTracker.setLimits(hierarchy5, usage1, 1)
+ path1expectedHeadroom :=
resources.NewResourceFromMap(map[string]resources.Quantity{
+ "mem": 20000000,
+ "vcore": 20000,
+ })
+ hierarchy1 := strings.Split(path1, configs.DOT)
+ assert.Assert(t, resources.Equals(groupTracker.headroom(hierarchy1),
path1expectedHeadroom))
+ path5expectedHeadroom :=
resources.NewResourceFromMap(map[string]resources.Quantity{
+ "mem": 70000000,
+ "vcore": 70000,
+ })
+ hierarchy5 := strings.Split(path5, configs.DOT)
+ assert.Assert(t, resources.Equals(groupTracker.headroom(hierarchy5),
path5expectedHeadroom))
+ assert.Assert(t, groupTracker.canRunApp(hierarchy1, TestApp4))
+
+ // lower limits
+ groupTracker.setLimits(path1, usage1, 1)
+ groupTracker.setLimits(path5, resources.Multiply(usage1, 2), 1)
+ lowerChildHeadroom :=
resources.NewResourceFromMap(map[string]resources.Quantity{
+ "mem": -20000000,
+ "vcore": -20000,
+ })
+ assert.Assert(t, resources.Equals(groupTracker.headroom(hierarchy1),
lowerChildHeadroom))
+ lowerParentHeadroom :=
resources.NewResourceFromMap(map[string]resources.Quantity{
+ "mem": -10000000,
+ "vcore": -10000,
+ })
+ assert.Assert(t, resources.Equals(groupTracker.headroom(hierarchy5),
lowerParentHeadroom))
+ assert.Assert(t, !groupTracker.canRunApp(hierarchy1, TestApp4))
+ assert.Assert(t, !groupTracker.canRunApp(hierarchy5, TestApp4))
+
+ // clear limits
+ eventSystem.Reset()
+ groupTracker.clearLimits(path1)
+ assert.Assert(t, resources.Equals(groupTracker.headroom(hierarchy1),
lowerParentHeadroom))
+ assert.Assert(t, resources.Equals(groupTracker.headroom(hierarchy5),
lowerParentHeadroom))
+ assert.Assert(t, !groupTracker.canRunApp(hierarchy1, TestApp4))
+ assert.Assert(t, !groupTracker.canRunApp(hierarchy5, TestApp4))
+ groupTracker.clearLimits(path5)
+ assert.Assert(t, groupTracker.headroom(hierarchy1) == nil)
+ assert.Assert(t, groupTracker.headroom(hierarchy5) == nil)
+ assert.Assert(t, groupTracker.canRunApp(hierarchy1, TestApp4))
+ assert.Assert(t, groupTracker.canRunApp(hierarchy5, TestApp4))
+ assert.Equal(t, 2, len(eventSystem.Events))
+ assert.Equal(t, si.EventRecord_REMOVE,
eventSystem.Events[0].EventChangeType)
+ assert.Equal(t, si.EventRecord_UG_GROUP_LIMIT,
eventSystem.Events[0].EventChangeDetail)
+ assert.Equal(t, si.EventRecord_REMOVE,
eventSystem.Events[1].EventChangeType)
+ assert.Equal(t, si.EventRecord_UG_GROUP_LIMIT,
eventSystem.Events[1].EventChangeDetail)
}
func getGroupResource(gt *GroupTracker) map[string]*resources.Resource {
diff --git a/pkg/scheduler/ugm/manager.go b/pkg/scheduler/ugm/manager.go
index a4cff214..f0a76d56 100644
--- a/pkg/scheduler/ugm/manager.go
+++ b/pkg/scheduler/ugm/manager.go
@@ -29,6 +29,7 @@ import (
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/common/security"
+ "github.com/apache/yunikorn-core/pkg/events"
"github.com/apache/yunikorn-core/pkg/log"
)
@@ -45,6 +46,7 @@ type Manager struct {
configuredGroups map[string][]string // Hold
groups for all configured queue paths.
userLimits map[string]map[string]*LimitConfig // Holds
queue path * user limit config
groupLimits map[string]map[string]*LimitConfig // Holds
queue path * group limit config
+ events *ugmEvents
sync.RWMutex
}
@@ -54,6 +56,7 @@ func newManager() *Manager {
groupTrackers: make(map[string]*GroupTracker),
userWildCardLimitsConfig: make(map[string]*LimitConfig),
groupWildCardLimitsConfig: make(map[string]*LimitConfig),
+ events:
newUGMEvents(events.GetEventSystem()),
}
return manager
}
@@ -93,7 +96,8 @@ func (m *Manager) IncreaseTrackedResource(queuePath,
applicationID string, usage
if !userTracker.hasGroupForApp(applicationID) {
m.ensureGroupTrackerForApp(queuePath, applicationID, user)
}
- return userTracker.increaseTrackedResource(strings.Split(queuePath,
configs.DOT), applicationID, usage)
+
+ return userTracker.increaseTrackedResource(queuePath, applicationID,
usage)
}
// DecreaseTrackedResource Decrease the resource usage for the given user
group and queue path combination.
@@ -107,7 +111,6 @@ func (m *Manager) DecreaseTrackedResource(queuePath,
applicationID string, usage
zap.String("application", applicationID),
zap.Stringer("resource", usage),
zap.Bool("removeApp", removeApp))
- hierarchy := strings.Split(queuePath, configs.DOT)
if queuePath == common.Empty || applicationID == common.Empty || usage
== nil || user.User == common.Empty {
log.Log(log.SchedUGM).Debug("Mandatory parameters are missing
to decrease the resource usage")
return false
@@ -119,6 +122,7 @@ func (m *Manager) DecreaseTrackedResource(queuePath,
applicationID string, usage
zap.String("user", user.User))
return false
}
+
// get the group now as the decrease might remove the app from the user
if removeApp is true
appGroup := userTracker.getGroupForApp(applicationID)
log.Log(log.SchedUGM).Debug("Decreasing resource usage for user",
@@ -128,7 +132,7 @@ func (m *Manager) DecreaseTrackedResource(queuePath,
applicationID string, usage
zap.String("tracked group", appGroup),
zap.Stringer("resource", usage),
zap.Bool("removeApp", removeApp))
- removeQT, decreased := userTracker.decreaseTrackedResource(hierarchy,
applicationID, usage, removeApp)
+ removeQT, decreased := userTracker.decreaseTrackedResource(queuePath,
applicationID, usage, removeApp)
if !decreased {
return decreased
}
@@ -154,7 +158,7 @@ func (m *Manager) DecreaseTrackedResource(queuePath,
applicationID string, usage
zap.String("application", applicationID),
zap.Stringer("resource", usage),
zap.Bool("removeApp", removeApp))
- removeQT, decreased = groupTracker.decreaseTrackedResource(hierarchy,
applicationID, usage, removeApp)
+ removeQT, decreased = groupTracker.decreaseTrackedResource(queuePath,
applicationID, usage, removeApp)
if !decreased {
return decreased
}
@@ -222,7 +226,7 @@ func (m *Manager) ensureGroupTrackerForApp(queuePath,
applicationID string, user
log.Log(log.SchedUGM).Info("Group tracker doesn't
exists. Creating appGroup tracker",
zap.String("queue path", queuePath),
zap.String("appGroup", appGroup))
- groupTracker = newGroupTracker(appGroup)
+ groupTracker = newGroupTracker(appGroup, m.events)
m.Lock()
m.groupTrackers[appGroup] = groupTracker
m.Unlock()
@@ -311,7 +315,6 @@ func (m *Manager) UpdateConfig(config configs.QueueConfig,
queuePath string) err
func (m *Manager) internalProcessConfig(cur configs.QueueConfig, queuePath
string, newUserLimits map[string]map[string]*LimitConfig, newGroupLimits
map[string]map[string]*LimitConfig,
newUserWildCardLimitsConfig map[string]*LimitConfig,
newGroupWildCardLimitsConfig map[string]*LimitConfig, newConfiguredGroups
map[string][]string) error {
- hierarchy := strings.Split(queuePath, configs.DOT)
// Traverse limits of specific queue path
for _, limit := range cur.Limits {
var maxResource *resources.Resource
@@ -338,7 +341,7 @@ func (m *Manager) internalProcessConfig(cur
configs.QueueConfig, queuePath strin
newUserWildCardLimitsConfig[queuePath] =
limitConfig
continue
}
- if err := m.setUserLimits(user, limitConfig,
hierarchy); err != nil {
+ if err := m.setUserLimits(user, limitConfig,
queuePath); err != nil {
return err
}
if _, ok := newUserLimits[queuePath]; !ok {
@@ -356,7 +359,7 @@ func (m *Manager) internalProcessConfig(cur
configs.QueueConfig, queuePath strin
zap.String("queue path", queuePath),
zap.Uint64("max application",
limit.MaxApplications),
zap.Any("max resources", limit.MaxResources))
- if err := m.setGroupLimits(group, limitConfig,
hierarchy); err != nil {
+ if err := m.setGroupLimits(group, limitConfig,
queuePath); err != nil {
return err
}
if _, ok := newGroupLimits[queuePath]; !ok {
@@ -402,7 +405,6 @@ func (m *Manager)
clearEarlierSetUserWildCardLimits(newUserWildCardLimits map[st
m.RLock()
defer m.RUnlock()
for queuePath, currentLimitConfig := range m.userWildCardLimitsConfig {
- hierarchy := strings.Split(queuePath, configs.DOT)
_, currentQPExists := m.userLimits[queuePath]
_, newQPExists := newUserLimits[queuePath]
@@ -414,7 +416,7 @@ func (m *Manager)
clearEarlierSetUserWildCardLimits(newUserWildCardLimits map[st
log.Log(log.SchedUGM).Debug("Need to
clear earlier set configs for user because wild card limit has been applied
earlier",
zap.String("user", ut.userName),
zap.String("queue path",
queuePath))
- ut.setLimits(hierarchy, nil, 0, false,
true)
+ ut.clearLimits(queuePath, true)
}
}
} else if !currentQPExists || !newQPExists {
@@ -428,7 +430,7 @@ func (m *Manager)
clearEarlierSetUserWildCardLimits(newUserWildCardLimits map[st
zap.String("queue path",
queuePath))
_, exists :=
m.userLimits[queuePath][ut.userName]
if _, ok =
newUserLimits[queuePath][ut.userName]; !ok || !exists {
- ut.setLimits(hierarchy,
newLimitConfig.maxResources, newLimitConfig.maxApplications, true, true)
+ ut.setLimits(queuePath,
newLimitConfig.maxResources, newLimitConfig.maxApplications, true, true)
}
}
}
@@ -443,7 +445,7 @@ func (m *Manager)
applyWildCardUserLimits(newUserWildCardLimits map[string]*Limi
for queuePath, newLimitConfig := range newUserWildCardLimits {
for _, ut := range m.userTrackers {
if _, ok := newUserLimits[queuePath][ut.userName]; !ok {
- ut.setLimits(strings.Split(queuePath, "."),
newLimitConfig.maxResources, newLimitConfig.maxApplications, true, false)
+ ut.setLimits(queuePath,
newLimitConfig.maxResources, newLimitConfig.maxApplications, true, false)
}
}
}
@@ -453,12 +455,11 @@ func (m *Manager)
applyWildCardUserLimits(newUserWildCardLimits map[string]*Limi
// by comparing with the existing config. Reset earlier usage only config set
earlier but not now
func (m *Manager) clearEarlierSetUserLimits(newUserLimits
map[string]map[string]*LimitConfig) {
for queuePath, limitConfig := range m.userLimits {
- hierarchy := strings.Split(queuePath, configs.DOT)
// Is queue path exists?
if newUserLimit, ok := newUserLimits[queuePath]; !ok {
for u := range limitConfig {
if ut, utExists := m.userTrackers[u]; utExists {
- m.resetUserEarlierUsage(ut, hierarchy)
+ m.resetUserEarlierUsage(ut, queuePath)
}
}
} else {
@@ -466,7 +467,7 @@ func (m *Manager) clearEarlierSetUserLimits(newUserLimits
map[string]map[string]
for u := range limitConfig {
if _, ulExists := newUserLimit[u]; !ulExists {
if ut, utExists := m.userTrackers[u];
utExists {
- m.resetUserEarlierUsage(ut,
hierarchy)
+ m.resetUserEarlierUsage(ut,
queuePath)
}
}
}
@@ -477,13 +478,14 @@ func (m *Manager) clearEarlierSetUserLimits(newUserLimits
map[string]map[string]
// resetUserEarlierUsage Clear or reset earlier usage only when user already
tracked for the queue path.
// Reset the max apps and max resources to default, unlink the end leaf queue
of queue path from its immediate parent and
// eventually remove user tracker object itself from ugm if it can be removed.
-func (m *Manager) resetUserEarlierUsage(ut *UserTracker, hierarchy []string) {
+func (m *Manager) resetUserEarlierUsage(ut *UserTracker, queuePath string) {
// Is this user already tracked for the queue path?
+ hierarchy := strings.Split(queuePath, configs.DOT)
if ut.IsQueuePathTrackedCompletely(hierarchy) {
log.Log(log.SchedUGM).Debug("Need to clear earlier set configs
for user",
zap.String("user", ut.userName),
zap.Strings("queue path", hierarchy))
- ut.setLimits(hierarchy, nil, 0, false, false)
+ ut.clearLimits(queuePath, false)
// Is there any running applications in end queue of this queue
path? If not, then remove the linkage between end queue and its immediate parent
if ut.IsUnlinkRequired(hierarchy) {
ut.UnlinkQT(hierarchy)
@@ -501,12 +503,11 @@ func (m *Manager) resetUserEarlierUsage(ut *UserTracker,
hierarchy []string) {
// by comparing with the existing config. Reset earlier usage only config set
earlier but not now
func (m *Manager) clearEarlierSetGroupLimits(newGroupLimits
map[string]map[string]*LimitConfig) {
for queuePath, limitConfig := range m.groupLimits {
- hierarchy := strings.Split(queuePath, configs.DOT)
// Is queue path exists?
if newGroupLimit, ok := newGroupLimits[queuePath]; !ok {
for g := range limitConfig {
if gt, gtExists := m.groupTrackers[g]; gtExists
{
- m.resetGroupEarlierUsage(gt, hierarchy)
+ m.resetGroupEarlierUsage(gt, queuePath)
}
}
} else {
@@ -514,7 +515,7 @@ func (m *Manager) clearEarlierSetGroupLimits(newGroupLimits
map[string]map[strin
for g := range limitConfig {
if _, glExists := newGroupLimit[g]; !glExists {
if gt, gtExists := m.groupTrackers[g];
gtExists {
- m.resetGroupEarlierUsage(gt,
hierarchy)
+ m.resetGroupEarlierUsage(gt,
queuePath)
}
}
}
@@ -526,7 +527,8 @@ func (m *Manager) clearEarlierSetGroupLimits(newGroupLimits
map[string]map[strin
// Decrease the group usage and collect the list of applications for which
user app group linkage needs to be broken.
// Reset the max apps and max resources to default, unlink the end leaf queue
of queue path from its immediate parent and
// eventually remove group tracker object itself from ugm if it can be removed.
-func (m *Manager) resetGroupEarlierUsage(gt *GroupTracker, hierarchy []string)
{
+func (m *Manager) resetGroupEarlierUsage(gt *GroupTracker, queuePath string) {
+ hierarchy := strings.Split(queuePath, configs.DOT)
if gt.IsQueuePathTrackedCompletely(hierarchy) {
log.Log(log.SchedUGM).Debug("Need to clear earlier set configs
for group",
zap.String("group", gt.groupName),
@@ -536,7 +538,7 @@ func (m *Manager) resetGroupEarlierUsage(gt *GroupTracker,
hierarchy []string) {
ut := m.userTrackers[u]
delete(ut.appGroupTrackers, app)
}
- gt.setLimits(hierarchy, nil, 0)
+ gt.clearLimits(queuePath)
// Is there any running applications in end queue of this queue
path? If not, then remove the linkage between end queue and its immediate parent
if gt.IsUnlinkRequired(hierarchy) {
gt.UnlinkQT(hierarchy)
@@ -561,43 +563,43 @@ func (m *Manager) replaceLimitConfigs(newUserLimits
map[string]map[string]*Limit
m.configuredGroups = newConfiguredGroups
}
-func (m *Manager) setUserLimits(user string, limitConfig *LimitConfig,
hierarchy []string) error {
+func (m *Manager) setUserLimits(user string, limitConfig *LimitConfig,
queuePath string) error {
m.Lock()
defer m.Unlock()
log.Log(log.SchedUGM).Debug("Setting user limits",
zap.String("user", user),
- zap.Strings("queue path", hierarchy),
+ zap.String("queue path", queuePath),
zap.Uint64("max application", limitConfig.maxApplications),
zap.Stringer("max resources", limitConfig.maxResources))
userTracker, ok := m.userTrackers[user]
if !ok {
log.Log(log.SchedUGM).Debug("User tracker does not exist.
Creating user tracker object to set the limit configuration",
zap.String("user", user),
- zap.Strings("queue path", hierarchy))
- userTracker = newUserTracker(user)
+ zap.String("queue path", queuePath))
+ userTracker = newUserTracker(user, m.events)
m.userTrackers[user] = userTracker
}
- userTracker.setLimits(hierarchy, limitConfig.maxResources,
limitConfig.maxApplications, false, false)
+ userTracker.setLimits(queuePath, limitConfig.maxResources,
limitConfig.maxApplications, false, false)
return nil
}
-func (m *Manager) setGroupLimits(group string, limitConfig *LimitConfig,
hierarchy []string) error {
+func (m *Manager) setGroupLimits(group string, limitConfig *LimitConfig,
queuePath string) error {
m.Lock()
defer m.Unlock()
log.Log(log.SchedUGM).Debug("Setting group limits",
zap.String("group", group),
- zap.Strings("queue path", hierarchy),
+ zap.String("queue path", queuePath),
zap.Uint64("max application", limitConfig.maxApplications),
zap.Stringer("max resources", limitConfig.maxResources))
groupTracker, ok := m.groupTrackers[group]
if !ok {
log.Log(log.SchedUGM).Debug("Group tracker does not exist.
Creating group tracker object to set the limit configuration",
zap.String("group", group),
- zap.Strings("queue path", hierarchy))
- groupTracker = newGroupTracker(group)
+ zap.String("queue path", queuePath))
+ groupTracker = newGroupTracker(group, m.events)
m.groupTrackers[group] = groupTracker
}
- groupTracker.setLimits(hierarchy, limitConfig.maxResources,
limitConfig.maxApplications)
+ groupTracker.setLimits(queuePath, limitConfig.maxResources,
limitConfig.maxApplications)
return nil
}
@@ -612,7 +614,7 @@ func (m *Manager) getUserTracker(user string) *UserTracker {
}
log.Log(log.SchedUGM).Info("User tracker doesn't exists. Creating user
tracker.",
zap.String("user", user))
- userTracker := newUserTracker(user)
+ userTracker := newUserTracker(user, m.events)
m.userTrackers[user] = userTracker
return userTracker
}
diff --git a/pkg/scheduler/ugm/ugm_events.go b/pkg/scheduler/ugm/ugm_events.go
new file mode 100644
index 00000000..712253e4
--- /dev/null
+++ b/pkg/scheduler/ugm/ugm_events.go
@@ -0,0 +1,116 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package ugm
+
+import (
+ "github.com/apache/yunikorn-core/pkg/common"
+ "github.com/apache/yunikorn-core/pkg/common/resources"
+ "github.com/apache/yunikorn-core/pkg/events"
+ "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+type ugmEvents struct {
+ eventSystem events.EventSystem
+}
+
+func (evt *ugmEvents) sendIncResourceUsageForUser(user, queuePath string,
allocated *resources.Resource) {
+ if !evt.eventSystem.IsEventTrackingEnabled() {
+ return
+ }
+ event := events.CreateUserGroupEventRecord(user, common.Empty,
queuePath, si.EventRecord_ADD, si.EventRecord_UG_USER_RESOURCE, allocated)
+ evt.eventSystem.AddEvent(event)
+}
+
+func (evt *ugmEvents) sendIncResourceUsageForGroup(group, queuePath string,
allocated *resources.Resource) {
+ if !evt.eventSystem.IsEventTrackingEnabled() {
+ return
+ }
+ event := events.CreateUserGroupEventRecord(group, common.Empty,
queuePath, si.EventRecord_ADD, si.EventRecord_UG_GROUP_RESOURCE, allocated)
+ evt.eventSystem.AddEvent(event)
+}
+
+func (evt *ugmEvents) sendDecResourceUsageForUser(user, queuePath string,
allocated *resources.Resource) {
+ if !evt.eventSystem.IsEventTrackingEnabled() {
+ return
+ }
+ event := events.CreateUserGroupEventRecord(user, common.Empty,
queuePath, si.EventRecord_REMOVE, si.EventRecord_UG_USER_RESOURCE, allocated)
+ evt.eventSystem.AddEvent(event)
+}
+
+func (evt *ugmEvents) sendDecResourceUsageForGroup(group, queuePath string,
allocated *resources.Resource) {
+ if !evt.eventSystem.IsEventTrackingEnabled() {
+ return
+ }
+ event := events.CreateUserGroupEventRecord(group, common.Empty,
queuePath, si.EventRecord_REMOVE, si.EventRecord_UG_GROUP_RESOURCE, allocated)
+ evt.eventSystem.AddEvent(event)
+}
+
+func (evt *ugmEvents) sendAppGroupLinked(group, applicationID string) {
+ if !evt.eventSystem.IsEventTrackingEnabled() {
+ return
+ }
+ event := events.CreateUserGroupEventRecord(group, common.Empty,
applicationID, si.EventRecord_SET, si.EventRecord_UG_APP_LINK, nil)
+ evt.eventSystem.AddEvent(event)
+}
+
+func (evt *ugmEvents) sendAppGroupUnlinked(group, applicationID string) {
+ if !evt.eventSystem.IsEventTrackingEnabled() {
+ return
+ }
+ event := events.CreateUserGroupEventRecord(group, common.Empty,
applicationID, si.EventRecord_REMOVE, si.EventRecord_UG_APP_LINK, nil)
+ evt.eventSystem.AddEvent(event)
+}
+
+func (evt *ugmEvents) sendLimitSetForUser(user, queuePath string) {
+ if !evt.eventSystem.IsEventTrackingEnabled() {
+ return
+ }
+ event := events.CreateUserGroupEventRecord(user, common.Empty,
queuePath, si.EventRecord_SET, si.EventRecord_UG_USER_LIMIT, nil)
+ evt.eventSystem.AddEvent(event)
+}
+
+func (evt *ugmEvents) sendLimitSetForGroup(group, queuePath string) {
+ if !evt.eventSystem.IsEventTrackingEnabled() {
+ return
+ }
+ event := events.CreateUserGroupEventRecord(group, common.Empty,
queuePath, si.EventRecord_SET, si.EventRecord_UG_GROUP_LIMIT, nil)
+ evt.eventSystem.AddEvent(event)
+}
+
+func (evt *ugmEvents) sendLimitRemoveForUser(user, queuePath string) {
+ if !evt.eventSystem.IsEventTrackingEnabled() {
+ return
+ }
+ event := events.CreateUserGroupEventRecord(user, common.Empty,
queuePath, si.EventRecord_REMOVE, si.EventRecord_UG_USER_LIMIT, nil)
+ evt.eventSystem.AddEvent(event)
+}
+
+func (evt *ugmEvents) sendLimitRemoveForGroup(group, queuePath string) {
+ if !evt.eventSystem.IsEventTrackingEnabled() {
+ return
+ }
+ event := events.CreateUserGroupEventRecord(group, common.Empty,
queuePath, si.EventRecord_REMOVE, si.EventRecord_UG_GROUP_LIMIT, nil)
+ evt.eventSystem.AddEvent(event)
+}
+
+func newUGMEvents(evt events.EventSystem) *ugmEvents {
+ return &ugmEvents{
+ eventSystem: evt,
+ }
+}
diff --git a/pkg/scheduler/ugm/ugm_events_test.go
b/pkg/scheduler/ugm/ugm_events_test.go
new file mode 100644
index 00000000..7d9cc9cd
--- /dev/null
+++ b/pkg/scheduler/ugm/ugm_events_test.go
@@ -0,0 +1,239 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package ugm
+
+import (
+ "testing"
+
+ "gotest.tools/v3/assert"
+
+ "github.com/apache/yunikorn-core/pkg/common/resources"
+ "github.com/apache/yunikorn-core/pkg/events/mock"
+ "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+var usage = resources.NewResourceFromMap(map[string]resources.Quantity{"cpu":
123})
+
+func TestIncUsageUser(t *testing.T) {
+ eventSystem := mock.NewEventSystemDisabled()
+ events := newUGMEvents(eventSystem)
+ events.sendIncResourceUsageForUser("user1", path1, usage)
+ assert.Equal(t, 0, len(eventSystem.Events))
+
+ eventSystem = mock.NewEventSystem()
+ events = newUGMEvents(eventSystem)
+ events.sendIncResourceUsageForUser("user1", path1, usage)
+ assert.Equal(t, 1, len(eventSystem.Events))
+ event := eventSystem.Events[0]
+ assert.Equal(t, "user1", event.ObjectID)
+ assert.Equal(t, path1, event.ReferenceID)
+ assert.Equal(t, si.EventRecord_USERGROUP, event.Type)
+ assert.Equal(t, si.EventRecord_UG_USER_RESOURCE,
event.EventChangeDetail)
+ assert.Equal(t, si.EventRecord_ADD, event.EventChangeType)
+ assert.Equal(t, "", event.Message)
+ assert.Assert(t, event.Resource != nil)
+ assert.Equal(t, 1, len(event.Resource.Resources))
+ assert.Equal(t, int64(123), event.Resource.Resources["cpu"].Value)
+}
+
+func TestIncUsageGroup(t *testing.T) {
+ eventSystem := mock.NewEventSystemDisabled()
+ events := newUGMEvents(eventSystem)
+ events.sendIncResourceUsageForGroup("group1", path1, usage)
+ assert.Equal(t, 0, len(eventSystem.Events))
+
+ eventSystem = mock.NewEventSystem()
+ events = newUGMEvents(eventSystem)
+ events.sendIncResourceUsageForGroup("group1", path1, usage)
+ assert.Equal(t, 1, len(eventSystem.Events))
+ event := eventSystem.Events[0]
+ assert.Equal(t, "group1", event.ObjectID)
+ assert.Equal(t, path1, event.ReferenceID)
+ assert.Equal(t, si.EventRecord_USERGROUP, event.Type)
+ assert.Equal(t, si.EventRecord_UG_GROUP_RESOURCE,
event.EventChangeDetail)
+ assert.Equal(t, si.EventRecord_ADD, event.EventChangeType)
+ assert.Equal(t, "", event.Message)
+ assert.Assert(t, event.Resource != nil)
+ assert.Equal(t, 1, len(event.Resource.Resources))
+ assert.Equal(t, int64(123), event.Resource.Resources["cpu"].Value)
+}
+
+func TestDecUsageUser(t *testing.T) {
+ eventSystem := mock.NewEventSystemDisabled()
+ events := newUGMEvents(eventSystem)
+ events.sendDecResourceUsageForUser("user1", path1, usage)
+ assert.Equal(t, 0, len(eventSystem.Events))
+
+ eventSystem = mock.NewEventSystem()
+ events = newUGMEvents(eventSystem)
+ events.sendDecResourceUsageForUser("user1", path1, usage)
+ assert.Equal(t, 1, len(eventSystem.Events))
+ event := eventSystem.Events[0]
+ assert.Equal(t, "user1", event.ObjectID)
+ assert.Equal(t, path1, event.ReferenceID)
+ assert.Equal(t, si.EventRecord_USERGROUP, event.Type)
+ assert.Equal(t, si.EventRecord_UG_USER_RESOURCE,
event.EventChangeDetail)
+ assert.Equal(t, si.EventRecord_REMOVE, event.EventChangeType)
+ assert.Equal(t, "", event.Message)
+ assert.Assert(t, event.Resource != nil)
+ assert.Equal(t, 1, len(event.Resource.Resources))
+ assert.Equal(t, int64(123), event.Resource.Resources["cpu"].Value)
+}
+
+func TestDecUsageGroup(t *testing.T) {
+ eventSystem := mock.NewEventSystemDisabled()
+ events := newUGMEvents(eventSystem)
+ events.sendDecResourceUsageForGroup("group1", path1, usage)
+ assert.Equal(t, 0, len(eventSystem.Events))
+
+ eventSystem = mock.NewEventSystem()
+ events = newUGMEvents(eventSystem)
+ events.sendDecResourceUsageForGroup("group1", path1, usage)
+ assert.Equal(t, 1, len(eventSystem.Events))
+ event := eventSystem.Events[0]
+ assert.Equal(t, "group1", event.ObjectID)
+ assert.Equal(t, path1, event.ReferenceID)
+ assert.Equal(t, si.EventRecord_USERGROUP, event.Type)
+ assert.Equal(t, si.EventRecord_UG_GROUP_RESOURCE,
event.EventChangeDetail)
+ assert.Equal(t, si.EventRecord_REMOVE, event.EventChangeType)
+ assert.Equal(t, "", event.Message)
+ assert.Assert(t, event.Resource != nil)
+ assert.Equal(t, 1, len(event.Resource.Resources))
+ assert.Equal(t, int64(123), event.Resource.Resources["cpu"].Value)
+}
+
+func TestAppGroupLinked(t *testing.T) {
+ eventSystem := mock.NewEventSystemDisabled()
+ events := newUGMEvents(eventSystem)
+ events.sendAppGroupLinked("group1", "app1")
+ assert.Equal(t, 0, len(eventSystem.Events))
+
+ eventSystem = mock.NewEventSystem()
+ events = newUGMEvents(eventSystem)
+ events.sendAppGroupLinked("group1", "app1")
+ assert.Equal(t, 1, len(eventSystem.Events))
+ event := eventSystem.Events[0]
+ assert.Equal(t, "group1", event.ObjectID)
+ assert.Equal(t, "app1", event.ReferenceID)
+ assert.Equal(t, si.EventRecord_USERGROUP, event.Type)
+ assert.Equal(t, si.EventRecord_UG_APP_LINK, event.EventChangeDetail)
+ assert.Equal(t, si.EventRecord_SET, event.EventChangeType)
+ assert.Equal(t, "", event.Message)
+ assert.Equal(t, 0, len(event.Resource.Resources))
+}
+
+func TestAppGroupUnlinked(t *testing.T) {
+ eventSystem := mock.NewEventSystemDisabled()
+ events := newUGMEvents(eventSystem)
+ events.sendAppGroupUnlinked("group1", "app1")
+ assert.Equal(t, 0, len(eventSystem.Events))
+
+ eventSystem = mock.NewEventSystem()
+ events = newUGMEvents(eventSystem)
+ events.sendAppGroupUnlinked("group1", "app1")
+ assert.Equal(t, 1, len(eventSystem.Events))
+ event := eventSystem.Events[0]
+ assert.Equal(t, "group1", event.ObjectID)
+ assert.Equal(t, "app1", event.ReferenceID)
+ assert.Equal(t, si.EventRecord_USERGROUP, event.Type)
+ assert.Equal(t, si.EventRecord_UG_APP_LINK, event.EventChangeDetail)
+ assert.Equal(t, si.EventRecord_REMOVE, event.EventChangeType)
+ assert.Equal(t, "", event.Message)
+ assert.Equal(t, 0, len(event.Resource.Resources))
+}
+
+func TestUserLimitSet(t *testing.T) {
+ eventSystem := mock.NewEventSystemDisabled()
+ events := newUGMEvents(eventSystem)
+ events.sendLimitSetForUser("user1", path1)
+ assert.Equal(t, 0, len(eventSystem.Events))
+
+ eventSystem = mock.NewEventSystem()
+ events = newUGMEvents(eventSystem)
+ events.sendLimitSetForUser("user1", path1)
+ assert.Equal(t, 1, len(eventSystem.Events))
+ event := eventSystem.Events[0]
+ assert.Equal(t, "user1", event.ObjectID)
+ assert.Equal(t, path1, event.ReferenceID)
+ assert.Equal(t, si.EventRecord_USERGROUP, event.Type)
+ assert.Equal(t, si.EventRecord_UG_USER_LIMIT, event.EventChangeDetail)
+ assert.Equal(t, si.EventRecord_SET, event.EventChangeType)
+ assert.Equal(t, "", event.Message)
+ assert.Equal(t, 0, len(event.Resource.Resources))
+}
+
+func TestUserLimitCleared(t *testing.T) {
+ eventSystem := mock.NewEventSystemDisabled()
+ events := newUGMEvents(eventSystem)
+ events.sendLimitRemoveForUser("user1", path1)
+ assert.Equal(t, 0, len(eventSystem.Events))
+
+ eventSystem = mock.NewEventSystem()
+ events = newUGMEvents(eventSystem)
+ events.sendLimitRemoveForUser("user1", path1)
+ assert.Equal(t, 1, len(eventSystem.Events))
+ event := eventSystem.Events[0]
+ assert.Equal(t, "user1", event.ObjectID)
+ assert.Equal(t, path1, event.ReferenceID)
+ assert.Equal(t, si.EventRecord_USERGROUP, event.Type)
+ assert.Equal(t, si.EventRecord_UG_USER_LIMIT, event.EventChangeDetail)
+ assert.Equal(t, si.EventRecord_REMOVE, event.EventChangeType)
+ assert.Equal(t, "", event.Message)
+ assert.Equal(t, 0, len(event.Resource.Resources))
+}
+
+func TestGroupLimitSet(t *testing.T) {
+ eventSystem := mock.NewEventSystemDisabled()
+ events := newUGMEvents(eventSystem)
+ events.sendLimitSetForGroup("group1", path1)
+ assert.Equal(t, 0, len(eventSystem.Events))
+
+ eventSystem = mock.NewEventSystem()
+ events = newUGMEvents(eventSystem)
+ events.sendLimitSetForGroup("group1", path1)
+ assert.Equal(t, 1, len(eventSystem.Events))
+ event := eventSystem.Events[0]
+ assert.Equal(t, "group1", event.ObjectID)
+ assert.Equal(t, path1, event.ReferenceID)
+ assert.Equal(t, si.EventRecord_USERGROUP, event.Type)
+ assert.Equal(t, si.EventRecord_UG_GROUP_LIMIT, event.EventChangeDetail)
+ assert.Equal(t, si.EventRecord_SET, event.EventChangeType)
+ assert.Equal(t, "", event.Message)
+ assert.Equal(t, 0, len(event.Resource.Resources))
+}
+
+func TestGroupLimitCleared(t *testing.T) {
+ eventSystem := mock.NewEventSystemDisabled()
+ events := newUGMEvents(eventSystem)
+ events.sendLimitRemoveForGroup("group1", path1)
+ assert.Equal(t, 0, len(eventSystem.Events))
+
+ eventSystem = mock.NewEventSystem()
+ events = newUGMEvents(eventSystem)
+ events.sendLimitRemoveForGroup("group1", path1)
+ assert.Equal(t, 1, len(eventSystem.Events))
+ event := eventSystem.Events[0]
+ assert.Equal(t, "group1", event.ObjectID)
+ assert.Equal(t, path1, event.ReferenceID)
+ assert.Equal(t, si.EventRecord_USERGROUP, event.Type)
+ assert.Equal(t, si.EventRecord_UG_GROUP_LIMIT, event.EventChangeDetail)
+ assert.Equal(t, si.EventRecord_REMOVE, event.EventChangeType)
+ assert.Equal(t, "", event.Message)
+ assert.Equal(t, 0, len(event.Resource.Resources))
+}
diff --git a/pkg/scheduler/ugm/user_tracker.go
b/pkg/scheduler/ugm/user_tracker.go
index b93b5396..1f1ecfee 100644
--- a/pkg/scheduler/ugm/user_tracker.go
+++ b/pkg/scheduler/ugm/user_tracker.go
@@ -19,11 +19,13 @@
package ugm
import (
+ "strings"
"sync"
"go.uber.org/zap"
"github.com/apache/yunikorn-core/pkg/common"
+ "github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/webservice/dao"
@@ -38,37 +40,41 @@ type UserTracker struct {
// and group tracker object as value.
appGroupTrackers map[string]*GroupTracker
queueTracker *QueueTracker // Holds the actual resource usage of
queue path where application runs
+ events *ugmEvents
sync.RWMutex
}
-func newUserTracker(userName string) *UserTracker {
+func newUserTracker(userName string, ugmEvents *ugmEvents) *UserTracker {
queueTracker := newRootQueueTracker(user)
userTracker := &UserTracker{
userName: userName,
appGroupTrackers: make(map[string]*GroupTracker),
queueTracker: queueTracker,
+ events: ugmEvents,
}
return userTracker
}
-func (ut *UserTracker) increaseTrackedResource(hierarchy []string,
applicationID string, usage *resources.Resource) bool {
+func (ut *UserTracker) increaseTrackedResource(queuePath string, applicationID
string, usage *resources.Resource) bool {
ut.Lock()
defer ut.Unlock()
+ hierarchy := strings.Split(queuePath, configs.DOT)
+ ut.events.sendIncResourceUsageForUser(ut.userName, queuePath, usage)
increased := ut.queueTracker.increaseTrackedResource(hierarchy,
applicationID, user, usage)
if increased {
gt := ut.appGroupTrackers[applicationID]
log.Log(log.SchedUGM).Debug("Increasing resource usage for
group",
zap.String("group", gt.getName()),
- zap.Strings("queue path", hierarchy),
+ zap.String("queue path", queuePath),
zap.String("application", applicationID),
zap.Stringer("resource", usage))
- increasedGroupUsage := gt.increaseTrackedResource(hierarchy,
applicationID, usage, ut.userName)
+ increasedGroupUsage := gt.increaseTrackedResource(queuePath,
applicationID, usage, ut.userName)
if !increasedGroupUsage {
_, decreased :=
ut.queueTracker.decreaseTrackedResource(hierarchy, applicationID, usage, false)
if !decreased {
log.Log(log.SchedUGM).Error("User resource
usage rollback has failed",
- zap.Strings("queue path", hierarchy),
+ zap.String("queue path", queuePath),
zap.String("application",
applicationID),
zap.String("user", ut.userName))
}
@@ -78,13 +84,19 @@ func (ut *UserTracker) increaseTrackedResource(hierarchy
[]string, applicationID
return increased
}
-func (ut *UserTracker) decreaseTrackedResource(hierarchy []string,
applicationID string, usage *resources.Resource, removeApp bool) (bool, bool) {
+func (ut *UserTracker) decreaseTrackedResource(queuePath string, applicationID
string, usage *resources.Resource, removeApp bool) (bool, bool) {
ut.Lock()
defer ut.Unlock()
+ ut.events.sendDecResourceUsageForUser(ut.userName, queuePath, usage)
if removeApp {
+ tracker := ut.appGroupTrackers[applicationID]
+ if tracker != nil {
+ appGroup := tracker.groupName
+ ut.events.sendAppGroupUnlinked(appGroup, applicationID)
+ }
delete(ut.appGroupTrackers, applicationID)
}
- return ut.queueTracker.decreaseTrackedResource(hierarchy,
applicationID, usage, removeApp)
+ return ut.queueTracker.decreaseTrackedResource(strings.Split(queuePath,
configs.DOT), applicationID, usage, removeApp)
}
func (ut *UserTracker) hasGroupForApp(applicationID string) bool {
@@ -97,6 +109,9 @@ func (ut *UserTracker) hasGroupForApp(applicationID string)
bool {
func (ut *UserTracker) setGroupForApp(applicationID string, groupTrack
*GroupTracker) {
ut.Lock()
defer ut.Unlock()
+ if groupTrack != nil {
+ ut.events.sendAppGroupLinked(groupTrack.groupName,
applicationID)
+ }
ut.appGroupTrackers[applicationID] = groupTrack
}
@@ -115,10 +130,18 @@ func (ut *UserTracker) getTrackedApplications()
map[string]*GroupTracker {
return ut.appGroupTrackers
}
-func (ut *UserTracker) setLimits(hierarchy []string, resource
*resources.Resource, maxApps uint64, useWildCard bool, doWildCardCheck bool) {
+func (ut *UserTracker) setLimits(queuePath string, resource
*resources.Resource, maxApps uint64, useWildCard bool, doWildCardCheck bool) {
+ ut.Lock()
+ defer ut.Unlock()
+ ut.events.sendLimitSetForUser(ut.userName, queuePath)
+ ut.queueTracker.setLimit(strings.Split(queuePath, configs.DOT),
resource, maxApps, useWildCard, user, doWildCardCheck)
+}
+
+func (ut *UserTracker) clearLimits(queuePath string, doWildCardCheck bool) {
ut.Lock()
defer ut.Unlock()
- ut.queueTracker.setLimit(hierarchy, resource, maxApps, useWildCard,
user, doWildCardCheck)
+ ut.events.sendLimitRemoveForUser(ut.userName, queuePath)
+ ut.queueTracker.setLimit(strings.Split(queuePath, configs.DOT), nil, 0,
false, user, doWildCardCheck)
}
func (ut *UserTracker) headroom(hierarchy []string) *resources.Resource {
diff --git a/pkg/scheduler/ugm/user_tracker_test.go
b/pkg/scheduler/ugm/user_tracker_test.go
index 258fb15d..28db413f 100644
--- a/pkg/scheduler/ugm/user_tracker_test.go
+++ b/pkg/scheduler/ugm/user_tracker_test.go
@@ -19,12 +19,16 @@
package ugm
import (
+ "strings"
"testing"
"gotest.tools/v3/assert"
+ "github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/common/security"
+ "github.com/apache/yunikorn-core/pkg/events/mock"
+ "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
const (
@@ -39,11 +43,13 @@ const (
queuePath4 = "root.parent.child12"
)
-var hierarchy1 = []string{"root", "parent", "child1"}
-var hierarchy2 = []string{"root", "parent", "child2"}
-var hierarchy3 = []string{"root", "parent", "child1", "child12"}
-var hierarchy4 = []string{"root", "parent", "child12"}
-var hierarchy5 = []string{"root", "parent"}
+const (
+ path1 = "root.parent.child1"
+ path2 = "root.parent.child2"
+ path3 = "root.parent.child1.child12"
+ path4 = "root.parent.child12"
+ path5 = "root.parent"
+)
func TestIncreaseTrackedResource(t *testing.T) {
// Queue setup:
@@ -53,35 +59,45 @@ func TestIncreaseTrackedResource(t *testing.T) {
// Initialize ugm
GetUserManager()
user := security.UserGroup{User: "test", Groups: []string{"test"}}
- userTracker := newUserTracker(user.User)
+ eventSystem := mock.NewEventSystem()
+ userTracker := newUserTracker(user.User, newUGMEvents(eventSystem))
usage1, err := resources.NewResourceFromConf(map[string]string{"mem":
"10M", "vcore": "10"})
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage1)
}
- result := userTracker.increaseTrackedResource(hierarchy1, TestApp1,
usage1)
+ result := userTracker.increaseTrackedResource(path1, TestApp1, usage1)
if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %+q,
app %s, res %v", hierarchy1, TestApp1, usage1)
+ t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", path1, TestApp1, usage1)
}
- groupTracker := newGroupTracker(user.User)
+ groupTracker := newGroupTracker(user.User,
newUGMEvents(mock.NewEventSystemDisabled()))
userTracker.setGroupForApp(TestApp1, groupTracker)
-
usage2, err := resources.NewResourceFromConf(map[string]string{"mem":
"20M", "vcore": "20"})
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage2)
}
- result = userTracker.increaseTrackedResource(hierarchy2, TestApp2,
usage2)
+ result = userTracker.increaseTrackedResource(path2, TestApp2, usage2)
if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %+q,
app %s, res %v, error %t", hierarchy2, TestApp2, usage2, err)
+ t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v, error %t", path2, TestApp2, usage2, err)
}
+ assert.Equal(t, 3, len(eventSystem.Events))
+ assert.Equal(t, si.EventRecord_UG_USER_RESOURCE,
eventSystem.Events[0].EventChangeDetail)
+ assert.Equal(t, si.EventRecord_ADD,
eventSystem.Events[0].EventChangeType)
+ assert.Equal(t, si.EventRecord_UG_APP_LINK,
eventSystem.Events[1].EventChangeDetail)
+ assert.Equal(t, "test", eventSystem.Events[1].ObjectID)
+ assert.Equal(t, TestApp1, eventSystem.Events[1].ReferenceID)
+ assert.Equal(t, si.EventRecord_SET,
eventSystem.Events[1].EventChangeType)
+ assert.Equal(t, si.EventRecord_UG_USER_RESOURCE,
eventSystem.Events[2].EventChangeDetail)
+ assert.Equal(t, si.EventRecord_ADD,
eventSystem.Events[2].EventChangeType)
+
userTracker.setGroupForApp(TestApp2, groupTracker)
usage3, err := resources.NewResourceFromConf(map[string]string{"mem":
"30M", "vcore": "30"})
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage3)
}
- result = userTracker.increaseTrackedResource(hierarchy3, TestApp3,
usage3)
+ result = userTracker.increaseTrackedResource(path3, TestApp3, usage3)
if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %+q,
app %s, res %v, error %t", hierarchy3, TestApp3, usage3, err)
+ t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v, error %t", path3, TestApp3, usage3, err)
}
userTracker.setGroupForApp(TestApp3, groupTracker)
@@ -89,9 +105,9 @@ func TestIncreaseTrackedResource(t *testing.T) {
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage3)
}
- result = userTracker.increaseTrackedResource(hierarchy4, TestApp4,
usage4)
+ result = userTracker.increaseTrackedResource(path4, TestApp4, usage4)
if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %+q,
app %s, res %v, error %t", hierarchy4, TestApp4, usage4, err)
+ t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v, error %t", path4, TestApp4, usage4, err)
}
userTracker.setGroupForApp(TestApp4, groupTracker)
@@ -113,17 +129,18 @@ func TestDecreaseTrackedResource(t *testing.T) {
// Initialize ugm
GetUserManager()
user := security.UserGroup{User: "test", Groups: []string{"test"}}
- userTracker := newUserTracker(user.User)
+ eventSystem := mock.NewEventSystem()
+ userTracker := newUserTracker(user.User, newUGMEvents(eventSystem))
usage1, err := resources.NewResourceFromConf(map[string]string{"mem":
"70M", "vcore": "70"})
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage1)
}
- result := userTracker.increaseTrackedResource(hierarchy1, TestApp1,
usage1)
+ result := userTracker.increaseTrackedResource(path1, TestApp1, usage1)
if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %+q,
app %s, res %v, error %t", hierarchy1, TestApp1, usage1, err)
+ t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v, error %t", path1, TestApp1, usage1, err)
}
- groupTracker := newGroupTracker(user.User)
+ groupTracker := newGroupTracker(user.User,
newUGMEvents(mock.NewEventSystemDisabled()))
userTracker.setGroupForApp(TestApp1, groupTracker)
assert.Equal(t, 1, len(userTracker.getTrackedApplications()))
@@ -131,9 +148,9 @@ func TestDecreaseTrackedResource(t *testing.T) {
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage2)
}
- result = userTracker.increaseTrackedResource(hierarchy2, TestApp2,
usage2)
+ result = userTracker.increaseTrackedResource(path2, TestApp2, usage2)
if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %+q,
app %s, res %v, error %t", hierarchy2, TestApp2, usage2, err)
+ t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v, error %t", path2, TestApp2, usage2, err)
}
userTracker.setGroupForApp(TestApp2, groupTracker)
actualResources := getUserResource(userTracker)
@@ -148,15 +165,18 @@ func TestDecreaseTrackedResource(t *testing.T) {
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage3)
}
- removeQT, decreased := userTracker.decreaseTrackedResource(hierarchy1,
TestApp1, usage3, false)
+ eventSystem.Reset()
+ removeQT, decreased := userTracker.decreaseTrackedResource(path1,
TestApp1, usage3, false)
if !decreased {
- t.Fatalf("unable to decrease tracked resource: queuepath %+q,
app %s, res %v, error %t", hierarchy1, TestApp1, usage3, err)
+ t.Fatalf("unable to decrease tracked resource: queuepath %s,
app %s, res %v, error %t", path1, TestApp1, usage3, err)
}
assert.Equal(t, removeQT, false, "wrong remove queue tracker value")
+ assert.Equal(t, si.EventRecord_UG_USER_RESOURCE,
eventSystem.Events[0].EventChangeDetail)
+ assert.Equal(t, si.EventRecord_REMOVE,
eventSystem.Events[0].EventChangeType)
- removeQT, decreased = userTracker.decreaseTrackedResource(hierarchy2,
TestApp2, usage3, false)
+ removeQT, decreased = userTracker.decreaseTrackedResource(path2,
TestApp2, usage3, false)
if !decreased {
- t.Fatalf("unable to decrease tracked resource: queuepath %+q,
app %s, res %v, error %t", hierarchy2, TestApp2, usage3, err)
+ t.Fatalf("unable to decrease tracked resource: queuepath %s,
app %s, res %v, error %t", path1, TestApp2, usage3, err)
}
actualResources1 := getUserResource(userTracker)
@@ -171,55 +191,115 @@ func TestDecreaseTrackedResource(t *testing.T) {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage3)
}
- removeQT, decreased = userTracker.decreaseTrackedResource(hierarchy1,
TestApp1, usage4, true)
+ eventSystem.Reset()
+ removeQT, decreased = userTracker.decreaseTrackedResource(path1,
TestApp1, usage4, true)
if !decreased {
- t.Fatalf("unable to increase tracked resource: queuepath %+q,
app %s, res %v, error %t", hierarchy1, TestApp1, usage1, err)
+ t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v, error %t", path1, TestApp1, usage1, err)
}
assert.Equal(t, 1, len(userTracker.getTrackedApplications()))
assert.Equal(t, removeQT, false, "wrong remove queue tracker value")
+ assert.Equal(t, 2, len(eventSystem.Events))
+ assert.Equal(t, si.EventRecord_UG_USER_RESOURCE,
eventSystem.Events[0].EventChangeDetail)
+ assert.Equal(t, si.EventRecord_REMOVE,
eventSystem.Events[0].EventChangeType)
+ assert.Equal(t, si.EventRecord_UG_APP_LINK,
eventSystem.Events[1].EventChangeDetail)
+ assert.Equal(t, si.EventRecord_REMOVE,
eventSystem.Events[1].EventChangeType)
usage5, err := resources.NewResourceFromConf(map[string]string{"mem":
"10M", "vcore": "10"})
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage5)
}
- removeQT, decreased = userTracker.decreaseTrackedResource(hierarchy2,
TestApp2, usage5, true)
+ removeQT, decreased = userTracker.decreaseTrackedResource(path2,
TestApp2, usage5, true)
if !decreased {
- t.Fatalf("unable to increase tracked resource: queuepath %+q,
app %s, res %v, error %t", hierarchy2, TestApp2, usage2, err)
+ t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v, error %t", path2, TestApp2, usage2, err)
}
assert.Equal(t, 0, len(userTracker.getTrackedApplications()))
assert.Equal(t, removeQT, true, "wrong remove queue tracker value")
}
-func TestSetMaxLimits(t *testing.T) {
+func TestSetAndClearMaxLimits(t *testing.T) {
// Queue setup:
// root->parent->child1
// Initialize ugm
GetUserManager()
user := security.UserGroup{User: "test", Groups: []string{"test"}}
- userTracker := newUserTracker(user.User)
+ eventSystem := mock.NewEventSystem()
+ userTracker := newUserTracker(user.User, newUGMEvents(eventSystem))
usage1, err := resources.NewResourceFromConf(map[string]string{"mem":
"10M", "vcore": "10"})
if err != nil {
t.Errorf("new resource create returned error or wrong resource:
error %t, res %v", err, usage1)
}
- result := userTracker.increaseTrackedResource(hierarchy1, TestApp1,
usage1)
+ result := userTracker.increaseTrackedResource(path1, TestApp1, usage1)
if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %+q,
app %s, res %v, error %t", hierarchy1, TestApp1, usage1, err)
+ t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v, error %t", path1, TestApp1, usage1, err)
}
- userTracker.setLimits(hierarchy1, resources.Multiply(usage1, 5), 5,
false, false)
- userTracker.setLimits(hierarchy5, resources.Multiply(usage1, 10), 10,
false, false)
+ eventSystem.Reset()
+ userTracker.setLimits(path1, resources.Multiply(usage1, 5), 5, false,
false)
+ userTracker.setLimits(path5, resources.Multiply(usage1, 10), 10, false,
false)
+ assert.Equal(t, 2, len(eventSystem.Events))
+ assert.Equal(t, si.EventRecord_UG_USER_LIMIT,
eventSystem.Events[0].EventChangeDetail)
+ assert.Equal(t, si.EventRecord_SET,
eventSystem.Events[0].EventChangeType)
+ assert.Equal(t, path1, eventSystem.Events[0].ReferenceID)
+ assert.Equal(t, si.EventRecord_UG_USER_LIMIT,
eventSystem.Events[1].EventChangeDetail)
+ assert.Equal(t, si.EventRecord_SET,
eventSystem.Events[1].EventChangeType)
+ assert.Equal(t, path5, eventSystem.Events[1].ReferenceID)
- result = userTracker.increaseTrackedResource(hierarchy1, TestApp1,
usage1)
+ result = userTracker.increaseTrackedResource(path1, TestApp1, usage1)
if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %+q,
app %s, res %v", hierarchy1, TestApp1, usage1)
+ t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", path1, TestApp1, usage1)
}
- result = userTracker.increaseTrackedResource(hierarchy1, TestApp2,
usage1)
+ result = userTracker.increaseTrackedResource(path1, TestApp2, usage1)
if !result {
- t.Fatalf("unable to increase tracked resource: queuepath %+q,
app %s, res %v", hierarchy1, TestApp2, usage1)
+ t.Fatalf("unable to increase tracked resource: queuepath %s,
app %s, res %v", path1, TestApp2, usage1)
}
- userTracker.setLimits(hierarchy1, usage1, 1, false, false)
- userTracker.setLimits(hierarchy5, usage1, 1, false, false)
+ path1expectedHeadroom :=
resources.NewResourceFromMap(map[string]resources.Quantity{
+ "mem": 20000000,
+ "vcore": 20000,
+ })
+ hierarchy1 := strings.Split(path1, configs.DOT)
+ assert.Assert(t, resources.Equals(userTracker.headroom(hierarchy1),
path1expectedHeadroom))
+ path5expectedHeadroom :=
resources.NewResourceFromMap(map[string]resources.Quantity{
+ "mem": 70000000,
+ "vcore": 70000,
+ })
+ hierarchy5 := strings.Split(path5, configs.DOT)
+ assert.Assert(t, resources.Equals(userTracker.headroom(hierarchy5),
path5expectedHeadroom))
+ assert.Assert(t, userTracker.canRunApp(hierarchy1, TestApp4))
+
+ // lower limits
+ userTracker.setLimits(path1, usage1, 1, false, false)
+ userTracker.setLimits(path5, resources.Multiply(usage1, 2), 1, false,
false)
+ lowerChildHeadroom :=
resources.NewResourceFromMap(map[string]resources.Quantity{
+ "mem": -20000000,
+ "vcore": -20000,
+ })
+ assert.Assert(t, resources.Equals(userTracker.headroom(hierarchy1),
lowerChildHeadroom))
+ lowerParentHeadroom :=
resources.NewResourceFromMap(map[string]resources.Quantity{
+ "mem": -10000000,
+ "vcore": -10000,
+ })
+ assert.Assert(t, resources.Equals(userTracker.headroom(hierarchy5),
lowerParentHeadroom))
+ assert.Assert(t, !userTracker.canRunApp(hierarchy1, TestApp4))
+ assert.Assert(t, !userTracker.canRunApp(hierarchy5, TestApp4))
+
+ // clear limits
+ eventSystem.Reset()
+ userTracker.clearLimits(path1, true)
+ assert.Assert(t, resources.Equals(userTracker.headroom(hierarchy1),
lowerParentHeadroom))
+ assert.Assert(t, resources.Equals(userTracker.headroom(hierarchy5),
lowerParentHeadroom))
+ assert.Assert(t, !userTracker.canRunApp(hierarchy1, TestApp4))
+ assert.Assert(t, !userTracker.canRunApp(hierarchy5, TestApp4))
+ userTracker.clearLimits(path5, true)
+ assert.Assert(t, userTracker.headroom(hierarchy1) == nil)
+ assert.Assert(t, userTracker.headroom(hierarchy5) == nil)
+ assert.Assert(t, userTracker.canRunApp(hierarchy1, TestApp4))
+ assert.Assert(t, userTracker.canRunApp(hierarchy5, TestApp4))
+ assert.Equal(t, 2, len(eventSystem.Events))
+ assert.Equal(t, si.EventRecord_REMOVE,
eventSystem.Events[0].EventChangeType)
+ assert.Equal(t, si.EventRecord_UG_USER_LIMIT,
eventSystem.Events[0].EventChangeDetail)
+ assert.Equal(t, si.EventRecord_REMOVE,
eventSystem.Events[1].EventChangeType)
+ assert.Equal(t, si.EventRecord_UG_USER_LIMIT,
eventSystem.Events[1].EventChangeDetail)
}
func getUserResource(ut *UserTracker) map[string]*resources.Resource {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]