This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/branch-1.5 by this push:
new 59ce2f7e YUNIKORN-1706 We should clean up failed apps in shim side
(#730)
59ce2f7e is described below
commit 59ce2f7e1609b6c2a594c09f008a33e458bf89ea
Author: qzhu <[email protected]>
AuthorDate: Mon Feb 26 16:36:51 2024 +0100
YUNIKORN-1706 We should clean up failed apps in shim side (#730)
Closes: #730
Signed-off-by: Peter Bacsko <[email protected]>
(cherry picked from commit aff0677f1e37f355734f1cfbabf8f23594d2504f)
---
pkg/cache/application.go | 4 ++++
pkg/cache/application_test.go | 4 ++++
pkg/shim/scheduler.go | 7 +++++++
pkg/shim/scheduler_mock_test.go | 7 +++++++
pkg/shim/scheduler_test.go | 10 +++++-----
5 files changed, 27 insertions(+), 5 deletions(-)
diff --git a/pkg/cache/application.go b/pkg/cache/application.go
index 0e6385c5..bb5bba2b 100644
--- a/pkg/cache/application.go
+++ b/pkg/cache/application.go
@@ -320,6 +320,10 @@ func (app *Application) getNonTerminatedTaskAlias()
[]string {
return nonTerminatedTaskAlias
}
+func (app *Application) AreAllTasksTerminated() bool {
+ return len(app.getNonTerminatedTaskAlias()) == 0
+}
+
// SetState is only for testing
// this is just used for testing, it is not supposed to change state like this
func (app *Application) SetState(state string) {
diff --git a/pkg/cache/application_test.go b/pkg/cache/application_test.go
index e0263c2e..03a572fc 100644
--- a/pkg/cache/application_test.go
+++ b/pkg/cache/application_test.go
@@ -532,6 +532,7 @@ func TestGetNonTerminatedTaskAlias(t *testing.T) {
// app doesn't have any task
res := app.getNonTerminatedTaskAlias()
assert.Equal(t, len(res), 0)
+ assert.Equal(t, app.AreAllTasksTerminated(), true)
pod1 := &v1.Pod{
TypeMeta: apis.TypeMeta{
@@ -566,6 +567,7 @@ func TestGetNonTerminatedTaskAlias(t *testing.T) {
// res should return both task's alias
res = app.getNonTerminatedTaskAlias()
assert.Equal(t, len(res), 2)
+ assert.Equal(t, app.AreAllTasksTerminated(), false)
assert.Assert(t, is.Contains(res, "/test-00001"))
assert.Assert(t, is.Contains(res, "/test-00002"))
@@ -576,12 +578,14 @@ func TestGetNonTerminatedTaskAlias(t *testing.T) {
// res should retuen empty
res = app.getNonTerminatedTaskAlias()
assert.Equal(t, len(res), 0)
+ assert.Equal(t, app.AreAllTasksTerminated(), true)
// set two tasks to one is terminated, another is non-terminated
task1.sm.SetState(TaskStates().Rejected)
task2.sm.SetState(TaskStates().Allocated)
// check the task, should only return task2's alias
res = app.getNonTerminatedTaskAlias()
+ assert.Equal(t, app.AreAllTasksTerminated(), false)
assert.Equal(t, len(res), 1)
assert.Equal(t, res[0], "/test-00002")
}
diff --git a/pkg/shim/scheduler.go b/pkg/shim/scheduler.go
index 76a2c717..cb53bb3f 100644
--- a/pkg/shim/scheduler.go
+++ b/pkg/shim/scheduler.go
@@ -158,6 +158,13 @@ func (ss *KubernetesShim) registerShimLayer() error {
func (ss *KubernetesShim) schedule() {
apps := ss.context.GetAllApplications()
for _, app := range apps {
+ if app.GetApplicationState() ==
cache.ApplicationStates().Failed {
+ if app.AreAllTasksTerminated() {
+
ss.context.RemoveApplicationInternal(app.GetApplicationID())
+ }
+ continue
+ }
+
if app.Schedule() {
ss.setOutstandingAppsFound(true)
}
diff --git a/pkg/shim/scheduler_mock_test.go b/pkg/shim/scheduler_mock_test.go
index 54c2f75b..51253b1d 100644
--- a/pkg/shim/scheduler_mock_test.go
+++ b/pkg/shim/scheduler_mock_test.go
@@ -128,6 +128,13 @@ func (fc *MockScheduler) addNode(nodeName string,
nodeLabels map[string]string,
return fc.apiProvider.GetAPIs().SchedulerAPI.UpdateNode(request)
}
+func (fc *MockScheduler) waitForApplicationDeletion(t *testing.T, appID
string) {
+ err := utils.WaitForCondition(func() bool {
+ return fc.context.GetApplication(appID) == nil
+ }, time.Second, 5*time.Second)
+ assert.NilError(t, err, "application has not been deleted")
+}
+
func (fc *MockScheduler) waitAndAssertApplicationState(t *testing.T, appID,
expectedState string) {
deadline := time.Now().Add(10 * time.Second)
for {
diff --git a/pkg/shim/scheduler_test.go b/pkg/shim/scheduler_test.go
index bbb1d9ad..9651cd0c 100644
--- a/pkg/shim/scheduler_test.go
+++ b/pkg/shim/scheduler_test.go
@@ -144,11 +144,11 @@ partitions:
// verify app state
cluster.waitAndAssertApplicationState(t, appID,
cache.ApplicationStates().Failed)
- // remove the application
- // remove task first or removeApplication will fail
- cluster.context.RemoveTask(appID, "task0001")
- err = cluster.removeApplication(appID)
- assert.Assert(t, err == nil)
+ // make the task terminal state
+ cluster.DeletePod(task1)
+ cluster.waitAndAssertTaskState(t, "app0001", "task0001",
cache.TaskStates().Completed)
+ // make sure the shim side has clean up the failed app
+ cluster.waitForApplicationDeletion(t, appID)
// submit again
task1 = createTestPod("root.a", appID, "task0001", taskResource)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]