This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new ce6dbb2ae [ST-Engine][seatunnel-seatunnel-starter]
seatunnel-seatunnel-starter (#2358)
ce6dbb2ae is described below
commit ce6dbb2ae250638e40c32bdf2137c972132ecbf6
Author: Eric <[email protected]>
AuthorDate: Fri Aug 5 14:43:33 2022 +0800
[ST-Engine][seatunnel-seatunnel-starter] seatunnel-seatunnel-starter
(#2358)
* Add SeaTunnel Engine ConfigProvider and seatunnel-seatunnel-starter
* add source file to licenserc.yaml
Co-authored-by: Wenjun Ruan <[email protected]>
---
.licenserc.yaml | 6 ++
LICENSE | 6 +-
seatunnel-core/pom.xml | 1 +
.../seatunnel/core/starter/config/EngineType.java | 1 +
seatunnel-core/seatunnel-seatunnel-starter/pom.xml | 51 ++++++++++
.../src/main/bin/start-seatunnel-engine-job.sh | 64 ++++++++++++
.../core/starter/seatunnel/CommandLineUtils.java} | 24 +++--
.../core/starter/seatunnel/SeaTunnelStarter.java | 50 ++++++++++
.../seatunnel/args/SeaTunnelCommandArgs.java | 50 ++++++++++
.../seatunnel/engine/client/JobConfigParser.java | 13 ++-
.../engine/client/JobExecutionEnvironment.java | 2 +-
.../seatunnel/engine/client/SeaTunnelClient.java | 5 +-
.../engine/client/SeaTunnelHazelcastClient.java | 7 +-
.../engine/client/JobConfigParserTest.java | 5 +-
.../engine/client/LogicalDagGeneratorTest.java | 2 +-
.../engine/client/SeaTunnelClientTest.java | 24 ++---
.../apache/seatunnel/engine/common/Constant.java | 14 ++-
.../engine/common/config/ConfigProvider.java | 110 +++++++++++++++++++++
.../engine/common/config/EngineConfig.java | 21 ++--
.../common/config}/SeaTunnelClientConfig.java | 2 +-
.../engine/common/config/SeaTunnelConfig.java | 71 +++++++++++++
.../common/config/SeaTunnelConfigSections.java | 49 +++++++++
.../engine/common/config/SeaTunnelProperties.java | 21 ++--
.../common/config/YamlSeaTunnelConfigBuilder.java | 105 ++++++++++++++++++++
.../common/config/YamlSeaTunnelConfigLocator.java | 61 ++++++++++++
.../config/YamlSeaTunnelDomConfigProcessor.java | 79 +++++++++++++++
.../engine/common/utils/ExceptionUtil.java | 13 ++-
.../src/main/resources/hazelcast-client.yaml | 23 +++++
.../src/main/resources/hazelcast.yaml | 29 ++++++
.../src/main/resources/seatunnel-default.yaml | 20 ++++
.../engine/server/operation/AsyncOperation.java | 13 ++-
.../task/AbstractSeaTunnelMessageTask.java | 13 ++-
32 files changed, 876 insertions(+), 79 deletions(-)
diff --git a/.licenserc.yaml b/.licenserc.yaml
index 0fd78f91c..a105be8bc 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -40,5 +40,11 @@ header:
- '**/NOTICE'
- '**/.gitkeep'
- '**/com/typesafe/config/**'
+ -
'seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/ConfigProvider.java'
+ -
'seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfigSections.java'
+ -
'seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigBuilder.java'
+ -
'seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java'
+ -
'seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java'
+ -
'seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java'
comment: on-failure
diff --git a/LICENSE b/LICENSE
index 7d4d93ac3..696821eed 100644
--- a/LICENSE
+++ b/LICENSE
@@ -219,4 +219,8 @@
seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade
generate_client_protocol.sh
from
https://github.com/hazelcast/hazelcast
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
from https://github.com/hazelcast/hazelcast
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java
from https://github.com/hazelcast/hazelcast
-seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
from https://github.com/hazelcast/hazelcast
\ No newline at end of file
+seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
from https://github.com/hazelcast/hazelcast
+seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/ConfigProvider.java
from https://github.com/hazelcast/hazelcast
+seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfigSections.java
from https://github.com/hazelcast/hazelcast
+seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigBuilder.java
from https://github.com/hazelcast/hazelcast
+
diff --git a/seatunnel-core/pom.xml b/seatunnel-core/pom.xml
index 937fe767b..d5f96fcbd 100644
--- a/seatunnel-core/pom.xml
+++ b/seatunnel-core/pom.xml
@@ -38,5 +38,6 @@
<module>seatunnel-core-starter</module>
<module>seatunnel-flink-starter</module>
<module>seatunnel-spark-starter</module>
+ <module>seatunnel-seatunnel-starter</module>
</modules>
</project>
diff --git
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java
index 50d2e435e..b0c47d2ac 100644
---
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java
+++
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.core.starter.config;
public enum EngineType {
SPARK("spark"),
FLINK("flink"),
+ SEATUNNEL("seatunnel"),
;
private final String engine;
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/pom.xml
b/seatunnel-core/seatunnel-seatunnel-starter/pom.xml
new file mode 100644
index 000000000..d2dd1ddbe
--- /dev/null
+++ b/seatunnel-core/seatunnel-seatunnel-starter/pom.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>seatunnel-core</artifactId>
+ <groupId>org.apache.seatunnel</groupId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>seatunnel-seatunnel-starter</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-core-starter</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-engine-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git
a/seatunnel-core/seatunnel-seatunnel-starter/src/main/bin/start-seatunnel-engine-job.sh
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/bin/start-seatunnel-engine-job.sh
new file mode 100755
index 000000000..1cfc741d8
--- /dev/null
+++
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/bin/start-seatunnel-engine-job.sh
@@ -0,0 +1,64 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+set -eu
+# resolve links - $0 may be a softlink
+PRG="$0"
+
+while [ -h "$PRG" ] ; do
+ # shellcheck disable=SC2006
+ ls=`ls -ld "$PRG"`
+ # shellcheck disable=SC2006
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ # shellcheck disable=SC2006
+ PRG=`dirname "$PRG"`/"$link"
+ fi
+done
+
+PRG_DIR=`dirname "$PRG"`
+APP_DIR=`cd "$PRG_DIR/.." >/dev/null; pwd`
+CONF_DIR=${APP_DIR}/config
+APP_JAR=${APP_DIR}/lib/seatunnel-seatunnel-starter.jar
+APP_MAIN="org.apache.seatunnel.core.starter.seatunnel.SeaTunnelStarter"
+
+if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
+ . "${CONF_DIR}/seatunnel-env.sh"
+fi
+
+if [ $# == 0 ]
+then
+ args="-h"
+else
+ args=$@
+fi
+
+CMD=$(java -cp ${APP_JAR} ${APP_MAIN} ${args}) && EXIT_CODE=$? || EXIT_CODE=$?
+if [ ${EXIT_CODE} -eq 234 ]; then
+ # print usage
+ echo "${CMD}"
+ exit 0
+elif [ ${EXIT_CODE} -eq 0 ]; then
+ echo "Execute SeaTunnel Job: ${CMD}"
+ eval ${CMD}
+else
+ echo "${CMD}"
+ exit ${EXIT_CODE}
+fi
diff --git
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/CommandLineUtils.java
similarity index 55%
copy from
seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java
copy to
seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/CommandLineUtils.java
index 50d2e435e..fd204e87a 100644
---
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java
+++
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/CommandLineUtils.java
@@ -15,20 +15,24 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.starter.config;
+package org.apache.seatunnel.core.starter.seatunnel;
-public enum EngineType {
- SPARK("spark"),
- FLINK("flink"),
- ;
+import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
- private final String engine;
+import com.beust.jcommander.JCommander;
- EngineType(String engine) {
- this.engine = engine;
+public class CommandLineUtils {
+
+ private CommandLineUtils() {
+ throw new UnsupportedOperationException("CommandLineUtils is a utility
class and cannot be instantiated");
}
- public String getEngine() {
- return engine;
+ public static SeaTunnelCommandArgs parseSeaTunnelArgs(String[] args) {
+ SeaTunnelCommandArgs seatunnelCommandArgs = new SeaTunnelCommandArgs();
+ JCommander.newBuilder()
+ .addObject(seatunnelCommandArgs)
+ .build()
+ .parse(args);
+ return seatunnelCommandArgs;
}
}
diff --git
a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java
new file mode 100644
index 000000000..6c3372ad7
--- /dev/null
+++
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.seatunnel;
+
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
+import org.apache.seatunnel.core.starter.utils.FileUtils;
+import org.apache.seatunnel.engine.client.JobExecutionEnvironment;
+import org.apache.seatunnel.engine.client.JobProxy;
+import org.apache.seatunnel.engine.client.SeaTunnelClient;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+
+import com.hazelcast.client.config.ClientConfig;
+
+import java.nio.file.Path;
+
+public class SeaTunnelStarter {
+ public static void main(String[] args) {
+ SeaTunnelCommandArgs seaTunnelCommandArgs =
CommandLineUtils.parseSeaTunnelArgs(args);
+ Path configFile = FileUtils.getConfigPath(seaTunnelCommandArgs);
+ Common.setDeployMode(DeployMode.CLIENT);
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName("fake_to_file");
+
+ ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+ SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
+ JobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(configFile.toString(), jobConfig);
+
+ JobProxy jobProxy = jobExecutionEnv.execute();
+
+ // TODO wait for job complete and then exit
+ }
+}
diff --git
a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/SeaTunnelCommandArgs.java
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/SeaTunnelCommandArgs.java
new file mode 100644
index 000000000..19c5d8587
--- /dev/null
+++
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/SeaTunnelCommandArgs.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.seatunnel.args;
+
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.core.starter.command.AbstractCommandArgs;
+import org.apache.seatunnel.core.starter.config.EngineType;
+
+import java.util.List;
+
+public class SeaTunnelCommandArgs extends AbstractCommandArgs {
+
+ /**
+ * Undefined parameters parsed will be stored here as seatunnel engint
command parameters.
+ */
+ private List<String> seatunnelParams;
+
+ @Override
+ public EngineType getEngineType() {
+ return EngineType.SEATUNNEL;
+ }
+
+ @Override
+ public DeployMode getDeployMode() {
+ return DeployMode.CLIENT;
+ }
+
+ public List<String> getSeatunnelParams() {
+ return seatunnelParams;
+ }
+
+ public void setSeatunnelParams(List<String> seatunnelParams) {
+ this.seatunnelParams = seatunnelParams;
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
index 42d563225..cbd014db7 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobConfigParser.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.apis.base.plugin.Plugin;
import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.core.base.config.ConfigBuilder;
+import org.apache.seatunnel.engine.common.config.JobConfig;
import
org.apache.seatunnel.engine.common.exception.JobDefineCheckExceptionSeaTunnel;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
@@ -70,13 +71,17 @@ public class JobConfigParser {
private List<Action> actions = new ArrayList<>();
private Set<URL> jarUrlsSet = new HashSet<>();
- protected JobConfigParser(@NonNull String jobDefineFilePath, @NonNull
IdGenerator idGenerator) {
+ private JobConfig jobConfig;
+
+ protected JobConfigParser(@NonNull String jobDefineFilePath, @NonNull
IdGenerator idGenerator, @NonNull JobConfig jobConfig) {
this.jobDefineFilePath = jobDefineFilePath;
this.idGenerator = idGenerator;
+ this.jobConfig = jobConfig;
}
public ImmutablePair<List<Action>, Set<URL>> parse() {
Config seaTunnelJobConfig = new
ConfigBuilder(Paths.get(jobDefineFilePath)).getConfig();
+ Config envConfigs = seaTunnelJobConfig.getConfig("env");
List<? extends Config> sinkConfigs =
seaTunnelJobConfig.getConfigList("sink");
List<? extends Config> transformConfigs =
seaTunnelJobConfig.getConfigList("transform");
List<? extends Config> sourceConfigs =
seaTunnelJobConfig.getConfigList("source");
@@ -85,6 +90,8 @@ public class JobConfigParser {
throw new JobDefineCheckExceptionSeaTunnel("Source And Sink can
not be null");
}
+ jobConfigAnalyze(envConfigs);
+
if (sinkConfigs.size() == 1
&& sourceConfigs.size() == 1
&& (CollectionUtils.isEmpty(transformConfigs) ||
transformConfigs.size() == 1)) {
@@ -95,6 +102,10 @@ public class JobConfigParser {
return new ImmutablePair<>(actions, jarUrlsSet);
}
+ private void jobConfigAnalyze(Config envConfigs) {
+ // TODO Resolve env configuration and set jobConfig
+ }
+
/**
* If there are multiple sources or multiple transforms or multiple sink,
We will rely on
* source_table_name and result_table_name to build actions pipeline.
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
index a8257bebd..94b965460 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/JobExecutionEnvironment.java
@@ -59,7 +59,7 @@ public class JobExecutionEnvironment {
}
private JobConfigParser getJobConfigParser() {
- return new JobConfigParser(jobFilePath, idGenerator);
+ return new JobConfigParser(jobFilePath, idGenerator, jobConfig);
}
public void addAction(List<Action> actions) {
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index 5aa194ce7..e182c527e 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -20,14 +20,15 @@ package org.apache.seatunnel.engine.client;
import org.apache.seatunnel.engine.common.config.JobConfig;
import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
+import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.logging.ILogger;
import lombok.NonNull;
public class SeaTunnelClient implements SeaTunnelClientInstance {
private SeaTunnelHazelcastClient hazelcastClient;
- public SeaTunnelClient(@NonNull SeaTunnelClientConfig
seaTunnelClientConfig) {
- this.hazelcastClient = new
SeaTunnelHazelcastClient(seaTunnelClientConfig);
+ public SeaTunnelClient(@NonNull ClientConfig clientConfig) {
+ this.hazelcastClient = new SeaTunnelHazelcastClient(clientConfig);
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
index 6a696d628..369d290bb 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.client;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
+import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl;
import com.hazelcast.client.impl.clientside.HazelcastClientProxy;
import com.hazelcast.client.impl.protocol.ClientMessage;
@@ -37,11 +38,11 @@ public class SeaTunnelHazelcastClient {
private final HazelcastClientInstanceImpl hazelcastClient;
private final SerializationService serializationService;
- public SeaTunnelHazelcastClient(@NonNull SeaTunnelClientConfig
seaTunnelClientConfig) {
- Preconditions.checkNotNull(seaTunnelClientConfig, "config");
+ public SeaTunnelHazelcastClient(@NonNull ClientConfig clientConfig) {
+ Preconditions.checkNotNull(clientConfig, "config");
this.hazelcastClient =
((HazelcastClientProxy)
com.hazelcast.client.HazelcastClient.newHazelcastClient(
- seaTunnelClientConfig)).client;
+ clientConfig)).client;
this.serializationService = hazelcastClient.getSerializationService();
ExceptionUtil.registerSeaTunnelExceptions(hazelcastClient.getClientExceptionFactory());
}
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
index f9da29cf0..4534c0a7b 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.client;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
@@ -40,7 +41,7 @@ public class JobConfigParserTest {
public void testSimpleJobParse() {
Common.setDeployMode(DeployMode.CLIENT);
String filePath = TestUtils.getResource("/fakesource_to_file.conf");
- JobConfigParser jobConfigParser = new JobConfigParser(filePath, new
IdGenerator());
+ JobConfigParser jobConfigParser = new JobConfigParser(filePath, new
IdGenerator(), new JobConfig());
ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
List<Action> actions = parse.getLeft();
Assert.assertEquals(1, actions.size());
@@ -58,7 +59,7 @@ public class JobConfigParserTest {
public void testComplexJobParse() {
Common.setDeployMode(DeployMode.CLIENT);
String filePath =
TestUtils.getResource("/fakesource_to_file_complex.conf");
- JobConfigParser jobConfigParser = new JobConfigParser(filePath, new
IdGenerator());
+ JobConfigParser jobConfigParser = new JobConfigParser(filePath, new
IdGenerator(), new JobConfig());
ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
List<Action> actions = parse.getLeft();
Assert.assertEquals(1, actions.size());
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
index c5f282bda..fb1e8f68d 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
@@ -48,7 +48,7 @@ public class LogicalDagGeneratorTest {
jobConfig.setName("fake_to_file");
IdGenerator idGenerator = new IdGenerator();
- ImmutablePair<List<Action>, Set<URL>> immutablePair = new
JobConfigParser(filePath, idGenerator).parse();
+ ImmutablePair<List<Action>, Set<URL>> immutablePair = new
JobConfigParser(filePath, idGenerator, new JobConfig()).parse();
LogicalDagGenerator logicalDagGenerator =
new LogicalDagGenerator(immutablePair.getLeft(), jobConfig,
idGenerator);
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index 51fd02134..2c451034c 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -20,12 +20,14 @@ package org.apache.seatunnel.engine.client;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.SeaTunnelClientConfig;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
import com.google.common.collect.Lists;
-import com.hazelcast.config.Config;
+import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.instance.impl.HazelcastInstanceFactory;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -39,19 +41,16 @@ public class SeaTunnelClientTest {
@BeforeClass
public static void beforeClass() throws Exception {
- Config config = new Config();
- config.getSecurityConfig().setEnabled(false);
- config.getJetConfig().setEnabled(false);
- config.setClusterName(Constant.DEFAULT_SEATUNNEL_CLUSTER_NAME);
- config.getNetworkConfig().setPort(50001);
- HazelcastInstanceFactory.newHazelcastInstance(config,
Thread.currentThread().getName(),
+ SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
+
HazelcastInstanceFactory.newHazelcastInstance(seaTunnelConfig.getHazelcastConfig(),
+ Thread.currentThread().getName(),
new SeaTunnelNodeContext());
}
@Test
public void testSayHello() {
SeaTunnelClientConfig seaTunnelClientConfig = new
SeaTunnelClientConfig();
-
seaTunnelClientConfig.getNetworkConfig().setAddresses(Lists.newArrayList("localhost:50001"));
+
seaTunnelClientConfig.getNetworkConfig().setAddresses(Lists.newArrayList("localhost:5801"));
SeaTunnelClient engineClient = new
SeaTunnelClient(seaTunnelClientConfig);
String msg = "Hello world";
@@ -67,11 +66,12 @@ public class SeaTunnelClientTest {
jobConfig.setBoundedness(Boundedness.BOUNDED);
jobConfig.setName("fake_to_file");
- SeaTunnelClientConfig seaTunnelClientConfig = new
SeaTunnelClientConfig();
-
seaTunnelClientConfig.getNetworkConfig().setAddresses(Lists.newArrayList("localhost:50001"));
- SeaTunnelClient engineClient = new
SeaTunnelClient(seaTunnelClientConfig);
+ ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+ SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(filePath, jobConfig);
JobProxy jobProxy = jobExecutionEnv.execute();
+
+ Assert.assertNotNull(jobProxy);
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
index 615b6ca48..9c016d91f 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
@@ -22,5 +22,17 @@ public class Constant {
public static final String SEATUNNEL_ID_GENERATOR_NAME =
"SeaTunnelIdGenerator";
- public static final String DEFAULT_SEATUNNEL_CLUSTER_NAME = "SeaTunnel";
+ public static final String DEFAULT_SEATUNNEL_CLUSTER_NAME = "seatunnel";
+
+ /**
+ * The default port number for the cluster auto-discovery mechanism's
+ * multicast communication.
+ */
+ public static final int DEFAULT_SEATUNNEL_MULTICAST_PORT = 53326;
+
+ public static final String SYSPROP_SEATUNNEL_CONFIG =
"hazelcast.seatunnel.config";
+
+ public static final String HAZELCAST_SEATUNNEL_CONF_FILE_PREFIX =
"seatunnel";
+
+ public static final String HAZELCAST_SEATUNNEL_DEFAULT_YAML =
"seatunnel-default.yaml";
}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/ConfigProvider.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/ConfigProvider.java
new file mode 100644
index 000000000..591afc0ec
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/ConfigProvider.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.config;
+
+import static
com.hazelcast.internal.config.DeclarativeConfigUtil.SYSPROP_CLIENT_CONFIG;
+import static
com.hazelcast.internal.config.DeclarativeConfigUtil.SYSPROP_MEMBER_CONFIG;
+import static
com.hazelcast.internal.config.DeclarativeConfigUtil.validateSuffixInSystemProperty;
+
+import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.client.config.YamlClientConfigBuilder;
+import com.hazelcast.client.config.impl.YamlClientConfigLocator;
+import com.hazelcast.config.Config;
+import com.hazelcast.config.YamlConfigBuilder;
+import com.hazelcast.internal.config.YamlConfigLocator;
+import lombok.NonNull;
+
+import java.util.Properties;
+
+/**
+ * Locates and loads SeaTunnel or SeaTunnel Client configurations from various
locations.
+ *
+ * @see YamlSeaTunnelConfigLocator
+ */
+public final class ConfigProvider {
+
+ private ConfigProvider() {
+ }
+
+ public static SeaTunnelConfig locateAndGetSeaTunnelConfig() {
+ return locateAndGetSeaTunnelConfig(null);
+ }
+
+ @NonNull
+ public static SeaTunnelConfig locateAndGetSeaTunnelConfig(Properties
properties) {
+
+ YamlSeaTunnelConfigLocator yamlConfigLocator = new
YamlSeaTunnelConfigLocator();
+ SeaTunnelConfig config;
+
+ if (yamlConfigLocator.locateFromSystemProperty()) {
+ // 1. Try loading YAML config if provided in system property
+ config = new
YamlSeaTunnelConfigBuilder(yamlConfigLocator).setProperties(properties).build();
+
+ } else if (yamlConfigLocator.locateInWorkDirOrOnClasspath()) {
+ // 2. Try loading YAML config from the working directory or from
the classpath
+ config = new
YamlSeaTunnelConfigBuilder(yamlConfigLocator).setProperties(properties).build();
+ } else {
+ // 3. Loading the default YAML configuration file
+ yamlConfigLocator.locateDefault();
+ config = new
YamlSeaTunnelConfigBuilder(yamlConfigLocator).setProperties(properties).build();
+ }
+ return config;
+
+ }
+
+ @NonNull
+ public static ClientConfig locateAndGetClientConfig() {
+ validateSuffixInSystemProperty(SYSPROP_CLIENT_CONFIG);
+
+ ClientConfig config;
+ YamlClientConfigLocator yamlConfigLocator = new
YamlClientConfigLocator();
+
+ if (yamlConfigLocator.locateFromSystemProperty()) {
+ // 1. Try loading config if provided in system property and it is
an YAML file
+ config = new
YamlClientConfigBuilder(yamlConfigLocator.getIn()).build();
+ } else if (yamlConfigLocator.locateInWorkDirOrOnClasspath()) {
+ // 2. Try loading YAML config from the working directory or from
the classpath
+ config = new
YamlClientConfigBuilder(yamlConfigLocator.getIn()).build();
+ } else {
+ // 3. Loading the default YAML configuration file
+ yamlConfigLocator.locateDefault();
+ config = new
YamlClientConfigBuilder(yamlConfigLocator.getIn()).build();
+ }
+ return config;
+ }
+
+ @NonNull
+ public static Config locateAndGetMemberConfig(Properties properties) {
+ validateSuffixInSystemProperty(SYSPROP_MEMBER_CONFIG);
+
+ Config config;
+ YamlConfigLocator yamlConfigLocator = new YamlConfigLocator();
+
+ if (yamlConfigLocator.locateFromSystemProperty()) {
+ // 1. Try loading config if provided in system property and it is
an YAML file
+ config = new
YamlConfigBuilder(yamlConfigLocator.getIn()).setProperties(properties).build();
+ } else if (yamlConfigLocator.locateInWorkDirOrOnClasspath()) {
+ // 2. Try loading YAML config from the working directory or from
the classpath
+ config = new
YamlConfigBuilder(yamlConfigLocator.getIn()).setProperties(properties).build();
+ } else {
+ // 3. Loading the default YAML configuration file
+ yamlConfigLocator.locateDefault();
+ config = new
YamlConfigBuilder(yamlConfigLocator.getIn()).setProperties(properties).build();
+ }
+ return config;
+ }
+}
diff --git
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
similarity index 68%
copy from
seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java
copy to
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
index 50d2e435e..e62481cf8 100644
---
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
@@ -15,20 +15,19 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.starter.config;
+package org.apache.seatunnel.engine.common.config;
-public enum EngineType {
- SPARK("spark"),
- FLINK("flink"),
- ;
+import static com.hazelcast.internal.util.Preconditions.checkBackupCount;
- private final String engine;
+import lombok.Data;
- EngineType(String engine) {
- this.engine = engine;
- }
+@Data
+public class EngineConfig {
+ private int backupCount;
- public String getEngine() {
- return engine;
+ public EngineConfig setBackupCount(int newBackupCount) {
+ checkBackupCount(newBackupCount, 0);
+ this.backupCount = newBackupCount;
+ return this;
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientConfig.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelClientConfig.java
similarity index 95%
rename from
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientConfig.java
rename to
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelClientConfig.java
index c6c6972a3..55daaef25 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientConfig.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelClientConfig.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.client;
+package org.apache.seatunnel.engine.common.config;
import org.apache.seatunnel.engine.common.Constant;
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfig.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfig.java
new file mode 100644
index 000000000..98f6c4732
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfig.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.config;
+
+import org.apache.seatunnel.engine.common.Constant;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+
+import java.io.File;
+
+public class SeaTunnelConfig {
+
+ private static final ILogger LOGGER =
Logger.getLogger(SeaTunnelConfig.class);
+
+ private final EngineConfig engineConfig = new EngineConfig();
+
+ static {
+ String value = seatunnelHome();
+ LOGGER.info("seatunnel.home is " + value);
+ System.setProperty(SeaTunnelProperties.SEATUNNEL_HOME.getName(),
value);
+ }
+
+ private Config hazelcastConfig;
+
+ public SeaTunnelConfig() {
+ hazelcastConfig = new Config();
+ hazelcastConfig.getNetworkConfig().getJoin().getMulticastConfig()
+ .setMulticastPort(Constant.DEFAULT_SEATUNNEL_MULTICAST_PORT);
+
hazelcastConfig.setClusterName(Constant.DEFAULT_SEATUNNEL_CLUSTER_NAME);
+ hazelcastConfig.getHotRestartPersistenceConfig()
+ .setBaseDir(new File(seatunnelHome(),
"recovery").getAbsoluteFile());
+ }
+
+ /**
+ * Returns the absolute path for seatunnel.home based from the system
property
+ * {@link SeaTunnelProperties#SEATUNNEL_HOME}
+ */
+ private static String seatunnelHome() {
+ return new
File(System.getProperty(SeaTunnelProperties.SEATUNNEL_HOME.getName(),
+
SeaTunnelProperties.SEATUNNEL_HOME.getDefaultValue())).getAbsolutePath();
+ }
+
+ public Config getHazelcastConfig() {
+ return hazelcastConfig;
+ }
+
+ public void setHazelcastConfig(Config hazelcastConfig) {
+ this.hazelcastConfig = hazelcastConfig;
+ }
+
+ public EngineConfig getEngineConfig() {
+ return engineConfig;
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfigSections.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfigSections.java
new file mode 100644
index 000000000..0fecdb0e6
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfigSections.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.config;
+
+/**
+ * Configuration sections for Hazelcast SeaTunnel shared by YAML based
+ * configurations
+ */
+enum SeaTunnelConfigSections {
+ SEATUNNEL("seatunnel", false),
+ ENGINE("engine", false);
+
+ final String name;
+ final boolean multipleOccurrence;
+
+ SeaTunnelConfigSections(String name, boolean multipleOccurrence) {
+ this.name = name;
+ this.multipleOccurrence = multipleOccurrence;
+
+ }
+
+ static boolean canOccurMultipleTimes(String name) {
+ for (SeaTunnelConfigSections element : values()) {
+ if (name.equals(element.name)) {
+ return element.multipleOccurrence;
+ }
+ }
+ return false;
+ }
+
+ boolean isEqual(String name) {
+ return this.name.equals(name);
+ }
+
+}
diff --git
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelProperties.java
similarity index 68%
copy from
seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java
copy to
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelProperties.java
index 50d2e435e..05885dba8 100644
---
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelProperties.java
@@ -15,20 +15,17 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.starter.config;
+package org.apache.seatunnel.engine.common.config;
-public enum EngineType {
- SPARK("spark"),
- FLINK("flink"),
- ;
+import com.hazelcast.spi.properties.HazelcastProperty;
- private final String engine;
-
- EngineType(String engine) {
- this.engine = engine;
- }
+/**
+ * Defines the names and default values for internal Hazelcast SeaTunnel
properties.
+ */
+public final class SeaTunnelProperties {
+ public static final HazelcastProperty SEATUNNEL_HOME = new
HazelcastProperty("seatunnel.home", "");
- public String getEngine() {
- return engine;
+ private SeaTunnelProperties() {
}
}
+
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigBuilder.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigBuilder.java
new file mode 100644
index 000000000..b4cea0d97
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigBuilder.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.config;
+
+import static com.hazelcast.internal.config.yaml.W3cDomUtil.asW3cNode;
+
+import com.hazelcast.config.AbstractYamlConfigBuilder;
+import com.hazelcast.config.InvalidConfigurationException;
+import com.hazelcast.internal.config.yaml.YamlDomChecker;
+import com.hazelcast.internal.nio.IOUtil;
+import com.hazelcast.internal.yaml.YamlLoader;
+import com.hazelcast.internal.yaml.YamlMapping;
+import com.hazelcast.internal.yaml.YamlNode;
+import com.hazelcast.jet.impl.util.ExceptionUtil;
+import lombok.NonNull;
+import org.w3c.dom.Node;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+public class YamlSeaTunnelConfigBuilder extends AbstractYamlConfigBuilder {
+
+ private final InputStream in;
+
+ public YamlSeaTunnelConfigBuilder() {
+ this((YamlSeaTunnelConfigLocator) null);
+ }
+
+ public YamlSeaTunnelConfigBuilder(YamlSeaTunnelConfigLocator locator) {
+ if (locator == null) {
+ locator = new YamlSeaTunnelConfigLocator();
+ locator.locateEverywhere();
+ }
+ this.in = locator.getIn();
+ }
+
+ public YamlSeaTunnelConfigBuilder(@NonNull InputStream inputStream) {
+ this.in = inputStream;
+ }
+
+ @Override
+ protected String getConfigRoot() {
+ return SeaTunnelConfigSections.SEATUNNEL.name;
+ }
+
+ public SeaTunnelConfig build() {
+ return build(new SeaTunnelConfig());
+ }
+
+ public SeaTunnelConfig build(SeaTunnelConfig config) {
+ try {
+ parseAndBuildConfig(config);
+ } catch (Exception e) {
+ throw ExceptionUtil.rethrow(e);
+ }
+
config.setHazelcastConfig(ConfigProvider.locateAndGetMemberConfig(getProperties()));
+ return config;
+ }
+
+ private void parseAndBuildConfig(SeaTunnelConfig config) throws Exception {
+ YamlMapping yamlRootNode;
+ try {
+ yamlRootNode = (YamlMapping) YamlLoader.load(in);
+ } catch (Exception ex) {
+ throw new InvalidConfigurationException("Invalid YAML
configuration", ex);
+ } finally {
+ IOUtil.closeResource(in);
+ }
+
+ YamlNode seatunnelRoot =
yamlRootNode.childAsMapping(SeaTunnelConfigSections.SEATUNNEL.name);
+ if (seatunnelRoot == null) {
+ seatunnelRoot = yamlRootNode;
+ }
+
+ YamlDomChecker.check(seatunnelRoot);
+
+ Node w3cRootNode = asW3cNode(seatunnelRoot);
+ replaceVariables(w3cRootNode);
+ importDocuments(seatunnelRoot);
+
+ new YamlSeaTunnelDomConfigProcessor(true,
config).buildConfig(w3cRootNode);
+ }
+
+ public YamlSeaTunnelConfigBuilder setProperties(Properties properties) {
+ if (properties == null) {
+ properties = System.getProperties();
+ }
+ setPropertiesInternal(properties);
+ return this;
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigLocator.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigLocator.java
new file mode 100644
index 000000000..7743195b3
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigLocator.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.config;
+
+import static
com.hazelcast.internal.config.DeclarativeConfigUtil.YAML_ACCEPTED_SUFFIXES;
+
+import org.apache.seatunnel.engine.common.Constant;
+
+import com.hazelcast.internal.config.AbstractConfigLocator;
+
+/**
+ * A support class for the {@link YamlSeaTunnelConfigBuilder} to locate the
+ * yaml configuration.
+ */
+public final class YamlSeaTunnelConfigLocator extends AbstractConfigLocator {
+
+ public YamlSeaTunnelConfigLocator() {
+ }
+
+ @Override
+ public boolean locateFromSystemProperty() {
+ return loadFromSystemProperty(Constant.SYSPROP_SEATUNNEL_CONFIG,
YAML_ACCEPTED_SUFFIXES);
+ }
+
+ @Override
+ protected boolean locateFromSystemPropertyOrFailOnUnacceptedSuffix() {
+ return
loadFromSystemPropertyOrFailOnUnacceptedSuffix(Constant.SYSPROP_SEATUNNEL_CONFIG,
+ YAML_ACCEPTED_SUFFIXES);
+ }
+
+ @Override
+ protected boolean locateInWorkDir() {
+ return
loadFromWorkingDirectory(Constant.HAZELCAST_SEATUNNEL_CONF_FILE_PREFIX,
YAML_ACCEPTED_SUFFIXES);
+ }
+
+ @Override
+ protected boolean locateOnClasspath() {
+ return
loadConfigurationFromClasspath(Constant.HAZELCAST_SEATUNNEL_CONF_FILE_PREFIX,
YAML_ACCEPTED_SUFFIXES);
+ }
+
+ @Override
+ public boolean locateDefault() {
+
loadDefaultConfigurationFromClasspath(Constant.HAZELCAST_SEATUNNEL_DEFAULT_YAML);
+ return true;
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
new file mode 100644
index 000000000..5531db478
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.config;
+
+import static com.hazelcast.internal.config.DomConfigHelper.childElements;
+import static com.hazelcast.internal.config.DomConfigHelper.cleanNodeName;
+import static com.hazelcast.internal.config.DomConfigHelper.getIntegerValue;
+
+import com.hazelcast.config.InvalidConfigurationException;
+import com.hazelcast.internal.config.AbstractDomConfigProcessor;
+import org.w3c.dom.Node;
+
+public class YamlSeaTunnelDomConfigProcessor extends
AbstractDomConfigProcessor {
+
+ private final SeaTunnelConfig config;
+
+ YamlSeaTunnelDomConfigProcessor(boolean domLevel3, SeaTunnelConfig config)
{
+ super(domLevel3);
+ this.config = config;
+ }
+
+ @Override
+ public void buildConfig(Node rootNode) {
+ for (Node node : childElements(rootNode)) {
+ String nodeName = cleanNodeName(node);
+ if (occurrenceSet.contains(nodeName)) {
+ throw new InvalidConfigurationException(
+ "Duplicate '" + nodeName + "' definition found in the
configuration.");
+ }
+ if (handleNode(node, nodeName)) {
+ continue;
+ }
+ if (!SeaTunnelConfigSections.canOccurMultipleTimes(nodeName)) {
+ occurrenceSet.add(nodeName);
+ }
+ }
+ }
+
+ private boolean handleNode(Node node, String name) {
+ if (SeaTunnelConfigSections.ENGINE.isEqual(name)) {
+ parseEngineConfig(node, config);
+ } else {
+ return true;
+ }
+ return false;
+ }
+
+ @SuppressWarnings("checkstyle:RegexpSinglelineJava")
+ private void parseEngineConfig(Node engineNode, SeaTunnelConfig config) {
+ final EngineConfig engineConfig = config.getEngineConfig();
+ for (Node node : childElements(engineNode)) {
+ String name = cleanNodeName(node);
+ switch (name) {
+ case "backup-count":
+ engineConfig.setBackupCount(
+ getIntegerValue("backup-count", getTextContent(node))
+ );
+ break;
+ default:
+ throw new AssertionError("Unrecognized element: " + name);
+ }
+ }
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
index 7e1ed660e..a3c211af5 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
@@ -1,12 +1,11 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast-client.yaml
b/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast-client.yaml
new file mode 100644
index 000000000..a5fe3bc42
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast-client.yaml
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+hazelcast-client:
+ cluster-name: seatunnel
+
+ network:
+ cluster-members:
+ - localhost:5801
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
b/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
new file mode 100644
index 000000000..bde537011
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+hazelcast:
+ cluster-name: seatunnel
+ network:
+ join:
+ multicast:
+ enabled: true
+ multicast-group: 224.2.2.3
+ multicast-port: 53328
+ port:
+ auto-increment: true
+ port-count: 100
+ port: 5801
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel-default.yaml
b/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel-default.yaml
new file mode 100644
index 000000000..74f842755
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel-default.yaml
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+seatunnel:
+ engine:
+ backup-count: 1
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
index 21d6dc8b8..58227776e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/AsyncOperation.java
@@ -1,12 +1,11 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java
index 33c829f75..6617a9459 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/AbstractSeaTunnelMessageTask.java
@@ -1,12 +1,11 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,