This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d39ad93ad6 [Fix][Zeta] Fix hybrid deployment can not get worker when 
init (#7235)
d39ad93ad6 is described below

commit d39ad93ad6c7420e42cd8965c9c3bf90f4a4e6b2
Author: Jia Fan <fanjiaemi...@qq.com>
AuthorDate: Fri Jul 19 19:42:03 2024 +0800

    [Fix][Zeta] Fix hybrid deployment can not get worker when init (#7235)
---
 .../engine/client/SeaTunnelClientTest.java         | 13 +++++++-----
 .../seatunnel/engine/server/SeaTunnelServer.java   |  6 ++++++
 .../resourcemanager/AbstractResourceManager.java   | 24 ++++++++++++++--------
 .../opeartion/SyncWorkerProfileOperation.java      |  6 +++++-
 .../resourcemanager/ResourceManagerTest.java       |  5 +++++
 5 files changed, 39 insertions(+), 15 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index 1510e7727f..d7e55db4ec 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -392,11 +392,14 @@ public class SeaTunnelClientTest {
                     Assertions.assertThrows(
                             Exception.class,
                             () -> 
jobExecutionEnvWithSameJobId.execute().waitForJobCompleteV2());
-            Assertions.assertEquals(
-                    String.format(
-                            "The job id %s has already been submitted and is 
not starting with a savepoint.",
-                            jobId),
-                    exception.getCause().getMessage());
+            Assertions.assertTrue(
+                    exception
+                            .getCause()
+                            .getMessage()
+                            .contains(
+                                    String.format(
+                                            "The job id %s has already been 
submitted and is not starting with a savepoint.",
+                                            jobId)));
         } catch (Exception e) {
             throw new RuntimeException(e);
         } finally {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 765869fd03..b76af4c19a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -84,6 +84,12 @@ public class SeaTunnelServer
 
     /** Lazy load for Slot Service */
     public SlotService getSlotService() {
+        // If the node is master node, the slot service is not needed.
+        if (EngineConfig.ClusterRole.MASTER.ordinal()
+                == 
seaTunnelConfig.getEngineConfig().getClusterRole().ordinal()) {
+            return null;
+        }
+
         if (slotService == null) {
             synchronized (this) {
                 if (slotService == null) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
index b830e5f056..6c04748ccc 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
@@ -74,25 +74,31 @@ public abstract class AbstractResourceManager implements 
ResourceManager {
 
     private void initWorker() {
         log.info("initWorker... ");
-        List<Address> aliveWorker =
+        List<Address> aliveNode =
                 nodeEngine.getClusterService().getMembers().stream()
-                        .filter(Member::isLiteMember)
                         .map(Member::getAddress)
                         .collect(Collectors.toList());
-        log.info("initWorker live nodes: " + aliveWorker);
+        log.info("init live nodes: {}", aliveNode);
         List<CompletableFuture<Void>> futures =
-                aliveWorker.stream()
+                aliveNode.stream()
                         .map(
-                                worker ->
-                                        sendToMember(new 
SyncWorkerProfileOperation(), worker)
+                                node ->
+                                        sendToMember(new 
SyncWorkerProfileOperation(), node)
                                                 .thenAccept(
                                                         p -> {
-                                                            registerWorker.put(
-                                                                    worker, 
(WorkerProfile) p);
+                                                            if (p != null) {
+                                                                
registerWorker.put(
+                                                                        node, 
(WorkerProfile) p);
+                                                                log.info(
+                                                                        
"received new worker register: "
+                                                                               
 + ((WorkerProfile)
+                                                                               
                 p)
+                                                                               
         .getAddress());
+                                                            }
                                                         }))
                         .collect(Collectors.toList());
         futures.forEach(CompletableFuture::join);
-        log.info("registerWorker: " + registerWorker);
+        log.info("registerWorker: {}", registerWorker);
     }
 
     @Override
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/SyncWorkerProfileOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/SyncWorkerProfileOperation.java
index ebe85e3daf..904629648a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/SyncWorkerProfileOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/SyncWorkerProfileOperation.java
@@ -33,7 +33,11 @@ public class SyncWorkerProfileOperation extends Operation 
implements IdentifiedD
     @Override
     public void run() throws Exception {
         SeaTunnelServer server = getService();
-        result = server.getSlotService().getWorkerProfile();
+        if (server.getSlotService() != null) {
+            result = server.getSlotService().getWorkerProfile();
+        } else {
+            result = null;
+        }
     }
 
     @Override
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java
index abd4ccdc09..5ac803064a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java
@@ -54,6 +54,11 @@ public class ResourceManagerTest extends 
AbstractSeaTunnelServerTest<ResourceMan
         server.getSlotService();
     }
 
+    @Test
+    public void testHaveWorkerWhenUseHybridDeployment() {
+        Assertions.assertEquals(1, resourceManager.workerCount(null));
+    }
+
     @Test
     public void testApplyRequest() throws ExecutionException, 
InterruptedException {
         List<ResourceProfile> resourceProfiles = new ArrayList<>();

Reply via email to