This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 877ae8c6a6 [Fix][Zeta] Fix operation thread leak when master switch
(#9464)
877ae8c6a6 is described below
commit 877ae8c6a6fe8f015456857d443d862b0958d35e
Author: Jia Fan <[email protected]>
AuthorDate: Fri Jun 20 09:31:00 2025 +0800
[Fix][Zeta] Fix operation thread leak when master switch (#9464)
---
.../SeaTunnelEngineRetryableException.java | 41 ++++++++++++++
.../seatunnel/engine/server/SeaTunnelServer.java | 32 +++--------
.../engine/server/CoordinatorServiceTest.java | 30 +++++++++++
.../operation/ReturnRetryTimesOperation.java | 49 +++++++++++++++++
.../server/operation/TestSerializerHook.java | 62 ++++++++++++++++++++++
5 files changed, 188 insertions(+), 26 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/SeaTunnelEngineRetryableException.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/SeaTunnelEngineRetryableException.java
new file mode 100644
index 0000000000..05986ba974
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/SeaTunnelEngineRetryableException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.exception;
+
+import com.hazelcast.spi.exception.RetryableException;
+
+public class SeaTunnelEngineRetryableException extends SeaTunnelEngineException
+ implements RetryableException {
+
+ public SeaTunnelEngineRetryableException() {
+ super();
+ }
+
+ public SeaTunnelEngineRetryableException(String message) {
+ super(message);
+ }
+
+ public SeaTunnelEngineRetryableException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ @Override
+ public Throwable createException(String s, Throwable throwable) {
+ return new SeaTunnelEngineRetryableException(s, throwable);
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 36428cfbe0..98bd6210ef 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.EngineConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineRetryableException;
import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
import org.apache.seatunnel.engine.core.classloader.DefaultClassLoaderService;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
@@ -54,9 +55,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import static
com.hazelcast.spi.properties.ClusterProperty.INVOCATION_MAX_RETRY_COUNT;
-import static
com.hazelcast.spi.properties.ClusterProperty.INVOCATION_RETRY_PAUSE;
-
@Slf4j
public class SeaTunnelServer
implements ManagedService, MembershipAwareService,
LiveOperationsTracker {
@@ -245,27 +243,8 @@ public class SeaTunnelServer
public CoordinatorService getCoordinatorService() {
int retryCount = 0;
if (isMasterNode()) {
- // The hazelcast operator request invocation will retry, We must
wait enough time to
- // wait the invocation return.
- String hazelcastInvocationMaxRetry =
- seaTunnelConfig
- .getHazelcastConfig()
- .getProperty(INVOCATION_MAX_RETRY_COUNT.getName());
- int maxRetry =
- hazelcastInvocationMaxRetry == null
- ?
Integer.parseInt(INVOCATION_MAX_RETRY_COUNT.getDefaultValue()) * 2
- : Integer.parseInt(hazelcastInvocationMaxRetry) *
2;
-
- String hazelcastRetryPause =
- seaTunnelConfig
- .getHazelcastConfig()
- .getProperty(INVOCATION_RETRY_PAUSE.getName());
-
- int retryPause =
- hazelcastRetryPause == null
- ?
Integer.parseInt(INVOCATION_RETRY_PAUSE.getDefaultValue())
- : Integer.parseInt(hazelcastRetryPause);
-
+ int maxRetry = 3;
+ int retryPause = 500;
while (isRunning
&& retryCount < maxRetry
&& !coordinatorService.isCoordinatorActive()
@@ -286,8 +265,9 @@ public class SeaTunnelServer
if (!isMasterNode()) {
throw new SeaTunnelEngineException("This is not a master node
now.");
}
-
- throw new SeaTunnelEngineException(
+ // Return retryable exception to retry from the worker node,
because the coordinator is
+ // not ready yet. By this way, we can release the operation thread
and retry later.
+ throw new SeaTunnelEngineRetryableException(
"Can not get coordinator service from an active master
node.");
} else {
throw new SeaTunnelEngineException(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
index 0343ccc5ef..ae725722d4 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
@@ -26,6 +26,7 @@ import
org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
+import org.apache.seatunnel.engine.server.operation.ReturnRetryTimesOperation;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import org.junit.jupiter.api.Assertions;
@@ -37,6 +38,7 @@ import com.hazelcast.instance.impl.HazelcastInstanceImpl;
import com.hazelcast.internal.serialization.Data;
import java.util.Collections;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import static org.awaitility.Awaitility.await;
@@ -84,6 +86,34 @@ public class CoordinatorServiceTest {
instance2.shutdown();
}
+ @Test
+ public void
testSeaTunnelEngineRetryableExceptionOperationCanBeRetryByHazelcast() {
+
+ HazelcastInstanceImpl instance =
+ SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName(
+
"CoordinatorServiceTest_testSeaTunnelEngineRetryableExceptionOperationCanBeRetryByHazelcast"));
+ try {
+ CompletionException exception =
+ Assertions.assertThrows(
+ CompletionException.class,
+ () -> {
+ NodeEngineUtil.sendOperationToMemberNode(
+ instance.node.getNodeEngine(),
+ new
ReturnRetryTimesOperation(),
+
instance.getCluster().getLocalMember().getAddress())
+ .join();
+ });
+ Assertions.assertTrue(
+ exception
+ .getCause()
+ .getMessage()
+ .contains("Retryable exception occurred, retry
times: 250"));
+ } finally {
+ instance.shutdown();
+ }
+ }
+
@Test
public void testInvocationFutureUseCompletableFutureExecutor() {
HazelcastInstanceImpl instance =
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/operation/ReturnRetryTimesOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/operation/ReturnRetryTimesOperation.java
new file mode 100644
index 0000000000..9f3a1f46fe
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/operation/ReturnRetryTimesOperation.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.operation;
+
+import
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineRetryableException;
+
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.impl.AllowedDuringPassiveState;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ReturnRetryTimesOperation extends Operation
+ implements IdentifiedDataSerializable, AllowedDuringPassiveState {
+
+ private static final AtomicInteger retryTimes = new AtomicInteger(0);
+
+ @Override
+ public void run() {
+ retryTimes.getAndIncrement();
+ throw new SeaTunnelEngineRetryableException(
+ "Retryable exception occurred, retry times: " +
retryTimes.get());
+ }
+
+ @Override
+ public int getFactoryId() {
+ return 0;
+ }
+
+ @Override
+ public int getClassId() {
+ return 0;
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/operation/TestSerializerHook.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/operation/TestSerializerHook.java
new file mode 100644
index 0000000000..c68f939371
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/operation/TestSerializerHook.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.operation;
+
+import com.google.auto.service.AutoService;
+import com.hazelcast.internal.serialization.DataSerializerHook;
+import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
+import com.hazelcast.nio.serialization.DataSerializableFactory;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.annotation.PrivateApi;
+
+/**
+ * A Java Service Provider hook for Hazelcast's Identified Data Serializable
mechanism. This is
+ * private API. All about the Job's data serializable define in this class.
+ */
+@AutoService(DataSerializerHook.class)
+@PrivateApi
+public final class TestSerializerHook implements DataSerializerHook {
+
+ public static final int RETURN_RETRY_TIMES = 0;
+
+ public static final int FACTORY_ID =
+ FactoryIdHelper.getFactoryId(TestSerializerHook.class.getName(),
0);
+
+ @Override
+ public int getFactoryId() {
+ return FACTORY_ID;
+ }
+
+ @Override
+ public DataSerializableFactory createFactory() {
+ return new Factory();
+ }
+
+ private static class Factory implements DataSerializableFactory {
+
+ @Override
+ public IdentifiedDataSerializable create(int typeId) {
+ switch (typeId) {
+ case RETURN_RETRY_TIMES:
+ return new ReturnRetryTimesOperation();
+ default:
+ throw new IllegalArgumentException("Unknown type id " +
typeId);
+ }
+ }
+ }
+}