This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push:
new 0c437953 [YUNIKORN-3123] Add retry logic to AssumePod to prevent PV
races (#985)
0c437953 is described below
commit 0c43795305dfa0d29b4cfa9128666c376917afe5
Author: Peter Bacsko <[email protected]>
AuthorDate: Thu Sep 11 14:12:06 2025 +0200
[YUNIKORN-3123] Add retry logic to AssumePod to prevent PV races (#985)
Closes: #985
Signed-off-by: Peter Bacsko <[email protected]>
---
pkg/cache/context.go | 4 ++--
pkg/cache/scheduler_callback.go | 17 +++++++++++++++--
pkg/shim/scheduler_mock_test.go | 2 +-
3 files changed, 18 insertions(+), 5 deletions(-)
diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index c131d9e7..6e59be64 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -822,7 +822,7 @@ func (ctx *Context) AssumePod(name, node string) error {
if err != nil {
log.Log(log.ShimContext).Error("Failed to find
pod volumes",
zap.String("podName", assumedPod.Name),
- zap.String("nodeName",
assumedPod.Spec.NodeName),
+ zap.String("nodeName", node),
zap.Error(err))
return err
}
@@ -835,7 +835,7 @@ func (ctx *Context) AssumePod(name, node string) error {
err = fmt.Errorf("pod %s has conflicting volume
claims: %s", pod.Name, sReason)
log.Log(log.ShimContext).Error("Pod has
conflicting volume claims",
zap.String("podName", assumedPod.Name),
- zap.String("nodeName",
assumedPod.Spec.NodeName),
+ zap.String("nodeName", node),
zap.Error(err))
return err
}
diff --git a/pkg/cache/scheduler_callback.go b/pkg/cache/scheduler_callback.go
index 1a21f5a2..ab93e6f9 100644
--- a/pkg/cache/scheduler_callback.go
+++ b/pkg/cache/scheduler_callback.go
@@ -20,8 +20,11 @@ package cache
import (
"fmt"
+ "time"
"go.uber.org/zap"
+ "k8s.io/apimachinery/pkg/util/wait"
+ "k8s.io/client-go/util/retry"
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
"github.com/apache/yunikorn-k8shim/pkg/dispatcher"
@@ -62,8 +65,18 @@ func (callback *AsyncRMCallback) UpdateAllocation(response
*si.AllocationRespons
}
task.setAllocationKey(alloc.AllocationKey)
-
- if err := callback.context.AssumePod(alloc.AllocationKey,
alloc.NodeID); err != nil {
+ backOff := wait.Backoff{
+ Steps: 30,
+ Duration: time.Second,
+ Cap: 30 * time.Second,
+ }
+ err := retry.OnError(backOff, func(err error) bool {
+ log.Log(log.ShimRMCallback).Error("AssumePod failed,
retrying", zap.Error(err))
+ return true
+ }, func() error {
+ return callback.context.AssumePod(alloc.AllocationKey,
alloc.NodeID)
+ })
+ if err != nil {
task.FailWithEvent(err.Error(), "AssumePodError")
return err
}
diff --git a/pkg/shim/scheduler_mock_test.go b/pkg/shim/scheduler_mock_test.go
index 44ee5869..f91a3c21 100644
--- a/pkg/shim/scheduler_mock_test.go
+++ b/pkg/shim/scheduler_mock_test.go
@@ -312,7 +312,7 @@ func (fc *MockScheduler)
waitForApplicationStateInCore(appID, partition, expecte
return false
}
return true
- }, time.Second, 5*time.Second)
+ }, time.Second, 60*time.Second)
}
func (fc *MockScheduler) waitAndAssertForeignAllocationInCore(partition,
allocationID, nodeID string, shouldExist bool) error {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]