This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new adf2132d2c [Improve][Zeta] Add verification for Env options (#9569)
adf2132d2c is described below
commit adf2132d2cc24887acbf6b594977681af1adfda1
Author: chestnufang <[email protected]>
AuthorDate: Fri Jul 18 10:07:57 2025 +0800
[Improve][Zeta] Add verification for Env options (#9569)
---
.../client/MultipleTableJobConfigParserTest.java | 19 +++++++
...atch_fake_to_console_with_error_env_option.conf | 58 ++++++++++++++++++++++
.../core/parse/MultipleTableJobConfigParser.java | 4 ++
.../seatunnel/engine/server/master/JobMaster.java | 7 +--
4 files changed, 85 insertions(+), 3 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
index 11c7011a6a..6d259f557c 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
@@ -223,4 +223,23 @@ public class MultipleTableJobConfigParserTest {
Assertions.assertNotEquals(classLoaders[0], classLoaders[2]);
Assertions.assertNotEquals(classLoaders[1], classLoaders[2]);
}
+
+ @Test
+ public void testMultipleTableJobConfigWithEnvOptionCheck() {
+ Common.setDeployMode(DeployMode.CLIENT);
+ String filePath =
+ ContentFormatUtilTest.getResource(
+ "/batch_fake_to_console_with_error_env_option.conf");
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setJobContext(new JobContext());
+ Config config = ConfigBuilder.of(Paths.get(filePath));
+
+ Exception checkExp = null;
+ try {
+ new MultipleTableJobConfigParser(config, new IdGenerator(),
jobConfig);
+ } catch (Exception e) {
+ checkExp = e;
+ }
+ Assertions.assertInstanceOf(IllegalArgumentException.class, checkExp);
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_to_console_with_error_env_option.conf
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_to_console_with_error_env_option.conf
new file mode 100644
index 0000000000..3de4136cb4
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_to_console_with_error_env_option.conf
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ execution.parallelism = 1
+ job.mode = 111
+ checkpoint.interval = null
+ jars=123
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ plugin_output = "fake"
+ parallelism = 1
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+}
+
+transform {
+ sql {
+ plugin_input = "fake"
+ query = "select 1 from dual"
+ plugin_output = "fake2"
+ }
+}
+
+sink {
+ console {
+ plugin_input="fake2"
+ }
+ console {
+ plugin_input="fake3"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index d2b47f5009..377040ff95 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -23,8 +23,10 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.PluginIdentifier;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.options.ConnectorCommonOptions;
import org.apache.seatunnel.api.options.EnvCommonOptions;
+import org.apache.seatunnel.api.options.EnvOptionRule;
import org.apache.seatunnel.api.sink.SaveModeExecuteLocation;
import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
import org.apache.seatunnel.api.sink.SaveModeHandler;
@@ -160,6 +162,7 @@ public class MultipleTableJobConfigParser {
this.seaTunnelJobConfig =
ConfigBuilder.of(Paths.get(jobDefineFilePath), variables);
this.envOptions =
ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
this.pipelineCheckpoints = pipelineCheckpoints;
+ ConfigValidator.of(this.envOptions).validate(new
EnvOptionRule().optionRule());
}
public MultipleTableJobConfigParser(
@@ -176,6 +179,7 @@ public class MultipleTableJobConfigParser {
this.seaTunnelJobConfig = seaTunnelJobConfig;
this.envOptions =
ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
this.pipelineCheckpoints = pipelineCheckpoints;
+ ConfigValidator.of(this.envOptions).validate(new
EnvOptionRule().optionRule());
}
public ImmutablePair<List<Action>, Set<URL>> parse(ClassLoaderService
classLoaderService) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 2560c1104a..3dab4a1231 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -347,10 +347,11 @@ public class JobMaster {
defaultCheckpointConfig.getStorage().getMaxRetainedCheckpoints());
jobCheckpointConfig.setStorage(jobCheckpointStorageConfig);
- if (jobEnv.containsKey(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
+ Optional<Object> checkpointIntervalOptional =
+
Optional.ofNullable(jobEnv.get(EnvCommonOptions.CHECKPOINT_INTERVAL.key()));
+ if (checkpointIntervalOptional.isPresent()) {
jobCheckpointConfig.setCheckpointInterval(
- Long.parseLong(
-
jobEnv.get(EnvCommonOptions.CHECKPOINT_INTERVAL.key()).toString()));
+
Long.parseLong(checkpointIntervalOptional.get().toString()));
} else if (jobConfig.getJobContext().getJobMode() == BATCH) {
LOGGER.info(
"in batch mode, the 'checkpoint.interval' configuration of
env is missing, so checkpoint will be disabled");