This is an automated email from the ASF dual-hosted git repository.

jarvis pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new dff3ef4656 [Fix][Zeta] If Zeta not a TCP discovery, it cannot find 
other members (#7757)
dff3ef4656 is described below

commit dff3ef4656b81ab74fb24b131035a2a32735c825
Author: Dongyeon Lee <dev.loust...@gmail.com>
AuthorDate: Sat Sep 28 10:00:41 2024 +0900

    [Fix][Zeta] If Zeta not a TCP discovery, it cannot find other members 
(#7757)
---
 .../seatunnel/engine/e2e/k8s/KubernetesIT.java     | 20 +++++++++++++++----
 ...st.yaml => hazelcast-kubernetes-discovery.yaml} | 11 ++++++-----
 ...hazelcast.yaml => hazelcast-tcp-discovery.yaml} |  0
 .../engine/server/SeaTunnelNodeContext.java        | 23 ++--------------------
 4 files changed, 24 insertions(+), 30 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/java/org/apache/seatunnel/engine/e2e/k8s/KubernetesIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/java/org/apache/seatunnel/engine/e2e/k8s/KubernetesIT.java
index 2de7caeddb..ce2b73fb10 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/java/org/apache/seatunnel/engine/e2e/k8s/KubernetesIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/java/org/apache/seatunnel/engine/e2e/k8s/KubernetesIT.java
@@ -61,7 +61,18 @@ public class KubernetesIT {
     private static final String podName = "seatunnel-0";
 
     @Test
-    public void test()
+    public void testTcpDiscovery()
+            throws IOException, XmlPullParserException, ApiException, 
InterruptedException {
+        runDiscoveryTest("hazelcast-tcp-discovery.yaml");
+    }
+
+    @Test
+    public void testKubernetesDiscovery()
+            throws IOException, XmlPullParserException, ApiException, 
InterruptedException {
+        runDiscoveryTest("hazelcast-kubernetes-discovery.yaml");
+    }
+
+    private void runDiscoveryTest(String hazelCastConfigFile)
             throws IOException, XmlPullParserException, ApiException, 
InterruptedException {
         ApiClient client = Config.defaultClient();
         AppsV1Api appsV1Api = new AppsV1Api(client);
@@ -82,7 +93,7 @@ public class KubernetesIT {
         log.info("Docker's environmental information");
         log.info(info.toString());
         if 
(dockerClient.listImagesCmd().withImageNameFilter(tag).exec().isEmpty()) {
-            copyFileToCurrentResources(targetPath);
+            copyFileToCurrentResources(hazelCastConfigFile, targetPath);
             File file =
                     new File(
                             PROJECT_ROOT_PATH
@@ -153,7 +164,8 @@ public class KubernetesIT {
         }
     }
 
-    private void copyFileToCurrentResources(String targetPath) throws 
IOException {
+    private void copyFileToCurrentResources(String hazelCastConfigFile, String 
targetPath)
+            throws IOException {
         File jarsPath = new File(targetPath + "/jars");
         jarsPath.mkdirs();
         File binPath = new File(targetPath + "/bin");
@@ -164,7 +176,7 @@ public class KubernetesIT {
                 new File(PROJECT_ROOT_PATH + "/config"), new File(targetPath + 
"/config"));
         // replace hazelcast.yaml and hazelcast-client.yaml
         Files.copy(
-                Paths.get(targetPath + "/custom_config/hazelcast.yaml"),
+                Paths.get(targetPath + "/custom_config/" + 
hazelCastConfigFile),
                 Paths.get(targetPath + "/config/hazelcast.yaml"),
                 StandardCopyOption.REPLACE_EXISTING);
         Files.copy(
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast.yaml
 
b/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast-kubernetes-discovery.yaml
similarity index 89%
copy from 
seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast.yaml
copy to 
seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast-kubernetes-discovery.yaml
index 10992ae39f..d4a79cd0e4 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast.yaml
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast-kubernetes-discovery.yaml
@@ -26,12 +26,13 @@ hazelcast:
         DATA:
           enabled: true
     join:
-      tcp-ip:
+      multicast:
+        enabled: false
+      kubernetes:
         enabled: true
-        member-list:
-          - seatunnel-0.seatunnel.default.svc.cluster.local
-          - seatunnel-1.seatunnel.default.svc.cluster.local
-
+        service-port: 5801
+        namespace: default
+        service-name: seatunnel
     port:
       auto-increment: true
       port-count: 100
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast.yaml
 
b/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast-tcp-discovery.yaml
similarity index 100%
rename from 
seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast.yaml
rename to 
seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast-tcp-discovery.yaml
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java
index 8ea6b3cf5b..60174b8864 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java
@@ -24,14 +24,10 @@ import com.hazelcast.instance.impl.DefaultNodeContext;
 import com.hazelcast.instance.impl.Node;
 import com.hazelcast.instance.impl.NodeExtension;
 import com.hazelcast.internal.cluster.Joiner;
-import com.hazelcast.internal.config.AliasedDiscoveryConfigUtils;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 
 import static com.hazelcast.config.ConfigAccessor.getActiveMemberNetworkConfig;
-import static 
com.hazelcast.internal.config.AliasedDiscoveryConfigUtils.allUsePublicAddress;
-import static 
com.hazelcast.spi.properties.ClusterProperty.DISCOVERY_SPI_ENABLED;
-import static 
com.hazelcast.spi.properties.ClusterProperty.DISCOVERY_SPI_PUBLIC_IP_ENABLED;
 
 @Slf4j
 public class SeaTunnelNodeContext extends DefaultNodeContext {
@@ -53,26 +49,11 @@ public class SeaTunnelNodeContext extends 
DefaultNodeContext {
                 
getActiveMemberNetworkConfig(seaTunnelConfig.getHazelcastConfig()).getJoin();
         join.verify();
 
-        if (node.shouldUseMulticastJoiner(join) && node.multicastService != 
null) {
-            super.createJoiner(node);
-        } else if (join.getTcpIpConfig().isEnabled()) {
+        if (join.getTcpIpConfig().isEnabled()) {
             log.info("Using LiteNodeDropOutTcpIpJoiner TCP/IP discovery");
             return new LiteNodeDropOutTcpIpJoiner(node);
-        } else if (node.getProperties().getBoolean(DISCOVERY_SPI_ENABLED)
-                || isAnyAliasedConfigEnabled(join)
-                || join.isAutoDetectionEnabled()) {
-            super.createJoiner(node);
         }
-        return null;
-    }
-
-    private static boolean isAnyAliasedConfigEnabled(JoinConfig join) {
-        return 
!AliasedDiscoveryConfigUtils.createDiscoveryStrategyConfigs(join).isEmpty();
-    }
 
-    private boolean usePublicAddress(JoinConfig join, Node node) {
-        return node.getProperties().getBoolean(DISCOVERY_SPI_PUBLIC_IP_ENABLED)
-                || allUsePublicAddress(
-                        
AliasedDiscoveryConfigUtils.aliasedDiscoveryConfigsFrom(join));
+        return super.createJoiner(node);
     }
 }

Reply via email to