This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new adf2132d2c [Improve][Zeta] Add verification for Env options (#9569)
adf2132d2c is described below

commit adf2132d2cc24887acbf6b594977681af1adfda1
Author: chestnufang <[email protected]>
AuthorDate: Fri Jul 18 10:07:57 2025 +0800

    [Improve][Zeta] Add verification for Env options (#9569)
---
 .../client/MultipleTableJobConfigParserTest.java   | 19 +++++++
 ...atch_fake_to_console_with_error_env_option.conf | 58 ++++++++++++++++++++++
 .../core/parse/MultipleTableJobConfigParser.java   |  4 ++
 .../seatunnel/engine/server/master/JobMaster.java  |  7 +--
 4 files changed, 85 insertions(+), 3 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
index 11c7011a6a..6d259f557c 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
@@ -223,4 +223,23 @@ public class MultipleTableJobConfigParserTest {
         Assertions.assertNotEquals(classLoaders[0], classLoaders[2]);
         Assertions.assertNotEquals(classLoaders[1], classLoaders[2]);
     }
+
+    @Test
+    public void testMultipleTableJobConfigWithEnvOptionCheck() {
+        Common.setDeployMode(DeployMode.CLIENT);
+        String filePath =
+                ContentFormatUtilTest.getResource(
+                        "/batch_fake_to_console_with_error_env_option.conf");
+        JobConfig jobConfig = new JobConfig();
+        jobConfig.setJobContext(new JobContext());
+        Config config = ConfigBuilder.of(Paths.get(filePath));
+
+        Exception checkExp = null;
+        try {
+            new MultipleTableJobConfigParser(config, new IdGenerator(), 
jobConfig);
+        } catch (Exception e) {
+            checkExp = e;
+        }
+        Assertions.assertInstanceOf(IllegalArgumentException.class, checkExp);
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_to_console_with_error_env_option.conf
 
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_to_console_with_error_env_option.conf
new file mode 100644
index 0000000000..3de4136cb4
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_to_console_with_error_env_option.conf
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  execution.parallelism = 1
+  job.mode = 111
+  checkpoint.interval = null
+  jars=123
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    plugin_output = "fake"
+    parallelism = 1
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
+  }
+}
+
+transform {
+  sql {
+    plugin_input = "fake"
+    query = "select 1 from dual"
+    plugin_output = "fake2"
+  }
+}
+
+sink {
+  console {
+    plugin_input="fake2"
+  }
+  console {
+    plugin_input="fake3"
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index d2b47f5009..377040ff95 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -23,8 +23,10 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.PluginIdentifier;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
 import org.apache.seatunnel.api.options.ConnectorCommonOptions;
 import org.apache.seatunnel.api.options.EnvCommonOptions;
+import org.apache.seatunnel.api.options.EnvOptionRule;
 import org.apache.seatunnel.api.sink.SaveModeExecuteLocation;
 import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
 import org.apache.seatunnel.api.sink.SaveModeHandler;
@@ -160,6 +162,7 @@ public class MultipleTableJobConfigParser {
         this.seaTunnelJobConfig = 
ConfigBuilder.of(Paths.get(jobDefineFilePath), variables);
         this.envOptions = 
ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
         this.pipelineCheckpoints = pipelineCheckpoints;
+        ConfigValidator.of(this.envOptions).validate(new 
EnvOptionRule().optionRule());
     }
 
     public MultipleTableJobConfigParser(
@@ -176,6 +179,7 @@ public class MultipleTableJobConfigParser {
         this.seaTunnelJobConfig = seaTunnelJobConfig;
         this.envOptions = 
ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
         this.pipelineCheckpoints = pipelineCheckpoints;
+        ConfigValidator.of(this.envOptions).validate(new 
EnvOptionRule().optionRule());
     }
 
     public ImmutablePair<List<Action>, Set<URL>> parse(ClassLoaderService 
classLoaderService) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 2560c1104a..3dab4a1231 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -347,10 +347,11 @@ public class JobMaster {
                 
defaultCheckpointConfig.getStorage().getMaxRetainedCheckpoints());
         jobCheckpointConfig.setStorage(jobCheckpointStorageConfig);
 
-        if (jobEnv.containsKey(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
+        Optional<Object> checkpointIntervalOptional =
+                
Optional.ofNullable(jobEnv.get(EnvCommonOptions.CHECKPOINT_INTERVAL.key()));
+        if (checkpointIntervalOptional.isPresent()) {
             jobCheckpointConfig.setCheckpointInterval(
-                    Long.parseLong(
-                            
jobEnv.get(EnvCommonOptions.CHECKPOINT_INTERVAL.key()).toString()));
+                    
Long.parseLong(checkpointIntervalOptional.get().toString()));
         } else if (jobConfig.getJobContext().getJobMode() == BATCH) {
             LOGGER.info(
                     "in batch mode, the 'checkpoint.interval' configuration of 
env is missing, so checkpoint will be disabled");

Reply via email to