This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8c8122d2fe [Fix][Zeta] Fix job restore failure after master switch 
when IMap still loading from S3 (#10562)
8c8122d2fe is described below

commit 8c8122d2fe5976e332c3c314929b74faccb0f543
Author: Ricky Makhija <[email protected]>
AuthorDate: Tue Mar 17 19:16:36 2026 +0530

    [Fix][Zeta] Fix job restore failure after master switch when IMap still 
loading from S3 (#10562)
---
 .../engine/common/utils/ExceptionUtil.java         | 26 +++++++++++++-
 .../engine/common/utils/ExceptionUtilTest.java     | 24 +++++++++++++
 .../engine/server/CoordinatorService.java          | 42 +++++++++++++++++++---
 3 files changed, 86 insertions(+), 6 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
index 5148c10843..c812ba54f4 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
@@ -31,6 +31,7 @@ import 
com.hazelcast.client.impl.protocol.ClientProtocolErrorCodes;
 import com.hazelcast.core.HazelcastInstanceNotActiveException;
 import com.hazelcast.core.OperationTimeoutException;
 import com.hazelcast.instance.impl.OutOfMemoryErrorDispatcher;
+import com.hazelcast.spi.exception.RetryableHazelcastException;
 import lombok.NonNull;
 
 import java.lang.reflect.InvocationTargetException;
@@ -151,10 +152,33 @@ public final class ExceptionUtil {
         throw new RuntimeException("Never throw here.");
     }
 
+    /**
+     * Check if an exception indicates an operation that should be retried.
+     *
+     * <p>This method is used by {@link 
org.apache.seatunnel.common.utils.RetryUtils} to determine
+     * if a failed operation should be retried. It extracts the root cause of 
the exception chain
+     * and checks if it matches known transient exception types.
+     *
+     * <p>The following exception types are considered retryable:
+     *
+     * <ul>
+     *   <li>{@link HazelcastInstanceNotActiveException} - Hazelcast instance 
is shutting down
+     *   <li>{@link InterruptedException} - Operation was interrupted
+     *   <li>{@link OperationTimeoutException} - Operation timed out waiting 
for a response
+     *   <li>{@link RetryableHazelcastException} - Hazelcast explicitly marks 
the operation as
+     *       retryable, e.g., when an IMap partition is still loading data 
from external storage
+     *       (MapStore) during cluster startup or master switch
+     * </ul>
+     *
+     * @param e the exception to check (may be wrapped in CompletionException 
/ ExecutionException)
+     * @return {@code true} if the root cause is a transient, retryable 
exception; {@code false}
+     *     otherwise
+     */
     public static boolean isOperationNeedRetryException(@NonNull Throwable e) {
         Throwable exception = ExceptionUtils.getRootException(e);
         return exception instanceof HazelcastInstanceNotActiveException
                 || exception instanceof InterruptedException
-                || exception instanceof OperationTimeoutException;
+                || exception instanceof OperationTimeoutException
+                || exception instanceof RetryableHazelcastException;
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/utils/ExceptionUtilTest.java
 
b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/utils/ExceptionUtilTest.java
index ee2e03a503..6df194a8c0 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/utils/ExceptionUtilTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/utils/ExceptionUtilTest.java
@@ -19,7 +19,11 @@ package org.apache.seatunnel.engine.common.utils;
 
 import org.junit.jupiter.api.Test;
 
+import com.hazelcast.spi.exception.RetryableHazelcastException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class ExceptionUtilTest {
 
@@ -45,4 +49,24 @@ public class ExceptionUtilTest {
     void throwsNullPointerExceptionWhenNull() {
         assertThrows(NullPointerException.class, () -> 
ExceptionUtil.sneakyThrow(null));
     }
+
+    @Test
+    void testIsOperationNeedRetryException_withRetryableHazelcastException() {
+        RetryableHazelcastException exception = new 
RetryableHazelcastException("IMap loading");
+        assertTrue(ExceptionUtil.isOperationNeedRetryException(exception));
+    }
+
+    @Test
+    void 
testIsOperationNeedRetryException_withWrappedRetryableHazelcastException() {
+        Throwable exception =
+                new Exception(
+                        new RuntimeException(new 
RetryableHazelcastException("IMap loading")));
+        assertTrue(ExceptionUtil.isOperationNeedRetryException(exception));
+    }
+
+    @Test
+    void testIsOperationNeedRetryException_withNonRetryableException() {
+        Exception exception = new Exception("Non-retryable error");
+        assertFalse(ExceptionUtil.isOperationNeedRetryException(exception));
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index d32ded069e..1053a7a144 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.api.event.EventProcessor;
 import org.apache.seatunnel.api.tracing.MDCExecutorService;
 import org.apache.seatunnel.api.tracing.MDCTracer;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.common.utils.StringFormatUtils;
 import org.apache.seatunnel.engine.common.Constant;
@@ -39,6 +40,7 @@ import 
org.apache.seatunnel.engine.common.exception.SavePointFailedException;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 import org.apache.seatunnel.engine.common.job.JobResult;
 import org.apache.seatunnel.engine.common.job.JobStatus;
+import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobDAGInfo;
@@ -447,10 +449,26 @@ public class CoordinatorService {
     }
 
     private void restoreAllRunningJobFromMasterNodeSwitch() {
-        List<Map.Entry<Long, JobInfo>> needRestoreFromMasterNodeSwitchJobs =
-                runningJobInfoIMap.entrySet().stream()
-                        .filter(entry -> 
!runningJobMasterMap.containsKey(entry.getKey()))
-                        .collect(Collectors.toList());
+        List<Map.Entry<Long, JobInfo>> needRestoreFromMasterNodeSwitchJobs;
+        try {
+            needRestoreFromMasterNodeSwitchJobs =
+                    RetryUtils.retryWithException(
+                            () ->
+                                    runningJobInfoIMap.entrySet().stream()
+                                            .filter(
+                                                    entry ->
+                                                            
!runningJobMasterMap.containsKey(
+                                                                    
entry.getKey()))
+                                            .collect(Collectors.toList()),
+                            new RetryUtils.RetryMaterial(
+                                    Constant.OPERATION_RETRY_TIME,
+                                    true,
+                                    
ExceptionUtil::isOperationNeedRetryException,
+                                    Constant.OPERATION_RETRY_SLEEP));
+        } catch (Exception e) {
+            throw new SeaTunnelEngineException(
+                    "Failed to fetch running jobs from IMap during master 
switch restore", e);
+        }
         if (needRestoreFromMasterNodeSwitchJobs.isEmpty()) {
             return;
         }
@@ -504,7 +522,21 @@ public class CoordinatorService {
     }
 
     private void restoreJobFromMasterActiveSwitch(@NonNull Long jobId, 
@NonNull JobInfo jobInfo) {
-        if (runningJobStateIMap.get(jobId) == null) {
+        Object jobState;
+        try {
+            jobState =
+                    RetryUtils.retryWithException(
+                            () -> runningJobStateIMap.get(jobId),
+                            new RetryUtils.RetryMaterial(
+                                    Constant.OPERATION_RETRY_TIME,
+                                    true,
+                                    
ExceptionUtil::isOperationNeedRetryException,
+                                    Constant.OPERATION_RETRY_SLEEP));
+        } catch (Exception e) {
+            throw new SeaTunnelEngineException(
+                    String.format("Job id %s restore failed, can not get job 
state", jobId), e);
+        }
+        if (jobState == null) {
             runningJobInfoIMap.remove(jobId);
             return;
         }

Reply via email to