This is an automated email from the ASF dual-hosted git repository.

pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c437953 [YUNIKORN-3123] Add retry logic to AssumePod to prevent PV 
races (#985)
0c437953 is described below

commit 0c43795305dfa0d29b4cfa9128666c376917afe5
Author: Peter Bacsko <[email protected]>
AuthorDate: Thu Sep 11 14:12:06 2025 +0200

    [YUNIKORN-3123] Add retry logic to AssumePod to prevent PV races (#985)
    
    Closes: #985
    
    Signed-off-by: Peter Bacsko <[email protected]>
---
 pkg/cache/context.go            |  4 ++--
 pkg/cache/scheduler_callback.go | 17 +++++++++++++++--
 pkg/shim/scheduler_mock_test.go |  2 +-
 3 files changed, 18 insertions(+), 5 deletions(-)

diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index c131d9e7..6e59be64 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -822,7 +822,7 @@ func (ctx *Context) AssumePod(name, node string) error {
                        if err != nil {
                                log.Log(log.ShimContext).Error("Failed to find 
pod volumes",
                                        zap.String("podName", assumedPod.Name),
-                                       zap.String("nodeName", 
assumedPod.Spec.NodeName),
+                                       zap.String("nodeName", node),
                                        zap.Error(err))
                                return err
                        }
@@ -835,7 +835,7 @@ func (ctx *Context) AssumePod(name, node string) error {
                                err = fmt.Errorf("pod %s has conflicting volume 
claims: %s", pod.Name, sReason)
                                log.Log(log.ShimContext).Error("Pod has 
conflicting volume claims",
                                        zap.String("podName", assumedPod.Name),
-                                       zap.String("nodeName", 
assumedPod.Spec.NodeName),
+                                       zap.String("nodeName", node),
                                        zap.Error(err))
                                return err
                        }
diff --git a/pkg/cache/scheduler_callback.go b/pkg/cache/scheduler_callback.go
index 1a21f5a2..ab93e6f9 100644
--- a/pkg/cache/scheduler_callback.go
+++ b/pkg/cache/scheduler_callback.go
@@ -20,8 +20,11 @@ package cache
 
 import (
        "fmt"
+       "time"
 
        "go.uber.org/zap"
+       "k8s.io/apimachinery/pkg/util/wait"
+       "k8s.io/client-go/util/retry"
 
        "github.com/apache/yunikorn-k8shim/pkg/common/utils"
        "github.com/apache/yunikorn-k8shim/pkg/dispatcher"
@@ -62,8 +65,18 @@ func (callback *AsyncRMCallback) UpdateAllocation(response 
*si.AllocationRespons
                }
 
                task.setAllocationKey(alloc.AllocationKey)
-
-               if err := callback.context.AssumePod(alloc.AllocationKey, 
alloc.NodeID); err != nil {
+               backOff := wait.Backoff{
+                       Steps:    30,
+                       Duration: time.Second,
+                       Cap:      30 * time.Second,
+               }
+               err := retry.OnError(backOff, func(err error) bool {
+                       log.Log(log.ShimRMCallback).Error("AssumePod failed, 
retrying", zap.Error(err))
+                       return true
+               }, func() error {
+                       return callback.context.AssumePod(alloc.AllocationKey, 
alloc.NodeID)
+               })
+               if err != nil {
                        task.FailWithEvent(err.Error(), "AssumePodError")
                        return err
                }
diff --git a/pkg/shim/scheduler_mock_test.go b/pkg/shim/scheduler_mock_test.go
index 44ee5869..f91a3c21 100644
--- a/pkg/shim/scheduler_mock_test.go
+++ b/pkg/shim/scheduler_mock_test.go
@@ -312,7 +312,7 @@ func (fc *MockScheduler) 
waitForApplicationStateInCore(appID, partition, expecte
                        return false
                }
                return true
-       }, time.Second, 5*time.Second)
+       }, time.Second, 60*time.Second)
 }
 
 func (fc *MockScheduler) waitAndAssertForeignAllocationInCore(partition, 
allocationID, nodeID string, shouldExist bool) error {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to