This is an automated email from the ASF dual-hosted git repository. jarvis pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new dff3ef4656 [Fix][Zeta] If Zeta not a TCP discovery, it cannot find other members (#7757) dff3ef4656 is described below commit dff3ef4656b81ab74fb24b131035a2a32735c825 Author: Dongyeon Lee <dev.loust...@gmail.com> AuthorDate: Sat Sep 28 10:00:41 2024 +0900 [Fix][Zeta] If Zeta not a TCP discovery, it cannot find other members (#7757) --- .../seatunnel/engine/e2e/k8s/KubernetesIT.java | 20 +++++++++++++++---- ...st.yaml => hazelcast-kubernetes-discovery.yaml} | 11 ++++++----- ...hazelcast.yaml => hazelcast-tcp-discovery.yaml} | 0 .../engine/server/SeaTunnelNodeContext.java | 23 ++-------------------- 4 files changed, 24 insertions(+), 30 deletions(-) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/java/org/apache/seatunnel/engine/e2e/k8s/KubernetesIT.java b/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/java/org/apache/seatunnel/engine/e2e/k8s/KubernetesIT.java index 2de7caeddb..ce2b73fb10 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/java/org/apache/seatunnel/engine/e2e/k8s/KubernetesIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/java/org/apache/seatunnel/engine/e2e/k8s/KubernetesIT.java @@ -61,7 +61,18 @@ public class KubernetesIT { private static final String podName = "seatunnel-0"; @Test - public void test() + public void testTcpDiscovery() + throws IOException, XmlPullParserException, ApiException, InterruptedException { + runDiscoveryTest("hazelcast-tcp-discovery.yaml"); + } + + @Test + public void testKubernetesDiscovery() + throws IOException, XmlPullParserException, ApiException, InterruptedException { + runDiscoveryTest("hazelcast-kubernetes-discovery.yaml"); + } + + private void runDiscoveryTest(String hazelCastConfigFile) throws IOException, XmlPullParserException, ApiException, InterruptedException { ApiClient client = Config.defaultClient(); AppsV1Api appsV1Api = new AppsV1Api(client); @@ -82,7 +93,7 @@ public class KubernetesIT { log.info("Docker's environmental information"); log.info(info.toString()); if (dockerClient.listImagesCmd().withImageNameFilter(tag).exec().isEmpty()) { - copyFileToCurrentResources(targetPath); + copyFileToCurrentResources(hazelCastConfigFile, targetPath); File file = new File( PROJECT_ROOT_PATH @@ -153,7 +164,8 @@ public class KubernetesIT { } } - private void copyFileToCurrentResources(String targetPath) throws IOException { + private void copyFileToCurrentResources(String hazelCastConfigFile, String targetPath) + throws IOException { File jarsPath = new File(targetPath + "/jars"); jarsPath.mkdirs(); File binPath = new File(targetPath + "/bin"); @@ -164,7 +176,7 @@ public class KubernetesIT { new File(PROJECT_ROOT_PATH + "/config"), new File(targetPath + "/config")); // replace hazelcast.yaml and hazelcast-client.yaml Files.copy( - Paths.get(targetPath + "/custom_config/hazelcast.yaml"), + Paths.get(targetPath + "/custom_config/" + hazelCastConfigFile), Paths.get(targetPath + "/config/hazelcast.yaml"), StandardCopyOption.REPLACE_EXISTING); Files.copy( diff --git a/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast.yaml b/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast-kubernetes-discovery.yaml similarity index 89% copy from seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast.yaml copy to seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast-kubernetes-discovery.yaml index 10992ae39f..d4a79cd0e4 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast.yaml +++ b/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast-kubernetes-discovery.yaml @@ -26,12 +26,13 @@ hazelcast: DATA: enabled: true join: - tcp-ip: + multicast: + enabled: false + kubernetes: enabled: true - member-list: - - seatunnel-0.seatunnel.default.svc.cluster.local - - seatunnel-1.seatunnel.default.svc.cluster.local - + service-port: 5801 + namespace: default + service-name: seatunnel port: auto-increment: true port-count: 100 diff --git a/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast.yaml b/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast-tcp-discovery.yaml similarity index 100% rename from seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast.yaml rename to seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast-tcp-discovery.yaml diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java index 8ea6b3cf5b..60174b8864 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java @@ -24,14 +24,10 @@ import com.hazelcast.instance.impl.DefaultNodeContext; import com.hazelcast.instance.impl.Node; import com.hazelcast.instance.impl.NodeExtension; import com.hazelcast.internal.cluster.Joiner; -import com.hazelcast.internal.config.AliasedDiscoveryConfigUtils; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import static com.hazelcast.config.ConfigAccessor.getActiveMemberNetworkConfig; -import static com.hazelcast.internal.config.AliasedDiscoveryConfigUtils.allUsePublicAddress; -import static com.hazelcast.spi.properties.ClusterProperty.DISCOVERY_SPI_ENABLED; -import static com.hazelcast.spi.properties.ClusterProperty.DISCOVERY_SPI_PUBLIC_IP_ENABLED; @Slf4j public class SeaTunnelNodeContext extends DefaultNodeContext { @@ -53,26 +49,11 @@ public class SeaTunnelNodeContext extends DefaultNodeContext { getActiveMemberNetworkConfig(seaTunnelConfig.getHazelcastConfig()).getJoin(); join.verify(); - if (node.shouldUseMulticastJoiner(join) && node.multicastService != null) { - super.createJoiner(node); - } else if (join.getTcpIpConfig().isEnabled()) { + if (join.getTcpIpConfig().isEnabled()) { log.info("Using LiteNodeDropOutTcpIpJoiner TCP/IP discovery"); return new LiteNodeDropOutTcpIpJoiner(node); - } else if (node.getProperties().getBoolean(DISCOVERY_SPI_ENABLED) - || isAnyAliasedConfigEnabled(join) - || join.isAutoDetectionEnabled()) { - super.createJoiner(node); } - return null; - } - - private static boolean isAnyAliasedConfigEnabled(JoinConfig join) { - return !AliasedDiscoveryConfigUtils.createDiscoveryStrategyConfigs(join).isEmpty(); - } - private boolean usePublicAddress(JoinConfig join, Node node) { - return node.getProperties().getBoolean(DISCOVERY_SPI_PUBLIC_IP_ENABLED) - || allUsePublicAddress( - AliasedDiscoveryConfigUtils.aliasedDiscoveryConfigsFrom(join)); + return super.createJoiner(node); } }