This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 877ae8c6a6 [Fix][Zeta] Fix operation thread leak when master switch 
(#9464)
877ae8c6a6 is described below

commit 877ae8c6a6fe8f015456857d443d862b0958d35e
Author: Jia Fan <[email protected]>
AuthorDate: Fri Jun 20 09:31:00 2025 +0800

    [Fix][Zeta] Fix operation thread leak when master switch (#9464)
---
 .../SeaTunnelEngineRetryableException.java         | 41 ++++++++++++++
 .../seatunnel/engine/server/SeaTunnelServer.java   | 32 +++--------
 .../engine/server/CoordinatorServiceTest.java      | 30 +++++++++++
 .../operation/ReturnRetryTimesOperation.java       | 49 +++++++++++++++++
 .../server/operation/TestSerializerHook.java       | 62 ++++++++++++++++++++++
 5 files changed, 188 insertions(+), 26 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/SeaTunnelEngineRetryableException.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/SeaTunnelEngineRetryableException.java
new file mode 100644
index 0000000000..05986ba974
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/SeaTunnelEngineRetryableException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.exception;
+
+import com.hazelcast.spi.exception.RetryableException;
+
+public class SeaTunnelEngineRetryableException extends SeaTunnelEngineException
+        implements RetryableException {
+
+    public SeaTunnelEngineRetryableException() {
+        super();
+    }
+
+    public SeaTunnelEngineRetryableException(String message) {
+        super(message);
+    }
+
+    public SeaTunnelEngineRetryableException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    @Override
+    public Throwable createException(String s, Throwable throwable) {
+        return new SeaTunnelEngineRetryableException(s, throwable);
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 36428cfbe0..98bd6210ef 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.EngineConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import 
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineRetryableException;
 import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
 import org.apache.seatunnel.engine.core.classloader.DefaultClassLoaderService;
 import org.apache.seatunnel.engine.server.execution.ExecutionState;
@@ -54,9 +55,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import static 
com.hazelcast.spi.properties.ClusterProperty.INVOCATION_MAX_RETRY_COUNT;
-import static 
com.hazelcast.spi.properties.ClusterProperty.INVOCATION_RETRY_PAUSE;
-
 @Slf4j
 public class SeaTunnelServer
         implements ManagedService, MembershipAwareService, 
LiveOperationsTracker {
@@ -245,27 +243,8 @@ public class SeaTunnelServer
     public CoordinatorService getCoordinatorService() {
         int retryCount = 0;
         if (isMasterNode()) {
-            // The hazelcast operator request invocation will retry, We must 
wait enough time to
-            // wait the invocation return.
-            String hazelcastInvocationMaxRetry =
-                    seaTunnelConfig
-                            .getHazelcastConfig()
-                            .getProperty(INVOCATION_MAX_RETRY_COUNT.getName());
-            int maxRetry =
-                    hazelcastInvocationMaxRetry == null
-                            ? 
Integer.parseInt(INVOCATION_MAX_RETRY_COUNT.getDefaultValue()) * 2
-                            : Integer.parseInt(hazelcastInvocationMaxRetry) * 
2;
-
-            String hazelcastRetryPause =
-                    seaTunnelConfig
-                            .getHazelcastConfig()
-                            .getProperty(INVOCATION_RETRY_PAUSE.getName());
-
-            int retryPause =
-                    hazelcastRetryPause == null
-                            ? 
Integer.parseInt(INVOCATION_RETRY_PAUSE.getDefaultValue())
-                            : Integer.parseInt(hazelcastRetryPause);
-
+            int maxRetry = 3;
+            int retryPause = 500;
             while (isRunning
                     && retryCount < maxRetry
                     && !coordinatorService.isCoordinatorActive()
@@ -286,8 +265,9 @@ public class SeaTunnelServer
             if (!isMasterNode()) {
                 throw new SeaTunnelEngineException("This is not a master node 
now.");
             }
-
-            throw new SeaTunnelEngineException(
+            // Return retryable exception to retry from the worker node, 
because the coordinator is
+            // not ready yet. By this way, we can release the operation thread 
and retry later.
+            throw new SeaTunnelEngineRetryableException(
                     "Can not get coordinator service from an active master 
node.");
         } else {
             throw new SeaTunnelEngineException(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
index 0343ccc5ef..ae725722d4 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
@@ -26,6 +26,7 @@ import 
org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.core.job.PipelineStatus;
 import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
+import org.apache.seatunnel.engine.server.operation.ReturnRetryTimesOperation;
 import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 
 import org.junit.jupiter.api.Assertions;
@@ -37,6 +38,7 @@ import com.hazelcast.instance.impl.HazelcastInstanceImpl;
 import com.hazelcast.internal.serialization.Data;
 
 import java.util.Collections;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
 
 import static org.awaitility.Awaitility.await;
@@ -84,6 +86,34 @@ public class CoordinatorServiceTest {
         instance2.shutdown();
     }
 
+    @Test
+    public void 
testSeaTunnelEngineRetryableExceptionOperationCanBeRetryByHazelcast() {
+
+        HazelcastInstanceImpl instance =
+                SeaTunnelServerStarter.createHazelcastInstance(
+                        TestUtils.getClusterName(
+                                
"CoordinatorServiceTest_testSeaTunnelEngineRetryableExceptionOperationCanBeRetryByHazelcast"));
+        try {
+            CompletionException exception =
+                    Assertions.assertThrows(
+                            CompletionException.class,
+                            () -> {
+                                NodeEngineUtil.sendOperationToMemberNode(
+                                                instance.node.getNodeEngine(),
+                                                new 
ReturnRetryTimesOperation(),
+                                                
instance.getCluster().getLocalMember().getAddress())
+                                        .join();
+                            });
+            Assertions.assertTrue(
+                    exception
+                            .getCause()
+                            .getMessage()
+                            .contains("Retryable exception occurred, retry 
times: 250"));
+        } finally {
+            instance.shutdown();
+        }
+    }
+
     @Test
     public void testInvocationFutureUseCompletableFutureExecutor() {
         HazelcastInstanceImpl instance =
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/operation/ReturnRetryTimesOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/operation/ReturnRetryTimesOperation.java
new file mode 100644
index 0000000000..9f3a1f46fe
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/operation/ReturnRetryTimesOperation.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.operation;
+
+import 
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineRetryableException;
+
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.impl.AllowedDuringPassiveState;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ReturnRetryTimesOperation extends Operation
+        implements IdentifiedDataSerializable, AllowedDuringPassiveState {
+
+    private static final AtomicInteger retryTimes = new AtomicInteger(0);
+
+    @Override
+    public void run() {
+        retryTimes.getAndIncrement();
+        throw new SeaTunnelEngineRetryableException(
+                "Retryable exception occurred, retry times: " + 
retryTimes.get());
+    }
+
+    @Override
+    public int getFactoryId() {
+        return 0;
+    }
+
+    @Override
+    public int getClassId() {
+        return 0;
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/operation/TestSerializerHook.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/operation/TestSerializerHook.java
new file mode 100644
index 0000000000..c68f939371
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/operation/TestSerializerHook.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.operation;
+
+import com.google.auto.service.AutoService;
+import com.hazelcast.internal.serialization.DataSerializerHook;
+import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
+import com.hazelcast.nio.serialization.DataSerializableFactory;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.annotation.PrivateApi;
+
+/**
+ * A Java Service Provider hook for Hazelcast's Identified Data Serializable 
mechanism. This is
+ * private API. All about the Job's data serializable define in this class.
+ */
+@AutoService(DataSerializerHook.class)
+@PrivateApi
+public final class TestSerializerHook implements DataSerializerHook {
+
+    public static final int RETURN_RETRY_TIMES = 0;
+
+    public static final int FACTORY_ID =
+            FactoryIdHelper.getFactoryId(TestSerializerHook.class.getName(), 
0);
+
+    @Override
+    public int getFactoryId() {
+        return FACTORY_ID;
+    }
+
+    @Override
+    public DataSerializableFactory createFactory() {
+        return new Factory();
+    }
+
+    private static class Factory implements DataSerializableFactory {
+
+        @Override
+        public IdentifiedDataSerializable create(int typeId) {
+            switch (typeId) {
+                case RETURN_RETRY_TIMES:
+                    return new ReturnRetryTimesOperation();
+                default:
+                    throw new IllegalArgumentException("Unknown type id " + 
typeId);
+            }
+        }
+    }
+}

Reply via email to