This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git


The following commit(s) were added to refs/heads/main by this push:
     new 70e526a2 [Feature][Seatunnel-web]Add support to configure placeholder 
with default value in the job config. (#208)
70e526a2 is described below

commit 70e526a2d38a4fe1243dc0e9d74edeb4f3826941
Author: Mohammad Arshad <ars...@apache.org>
AuthorDate: Mon Sep 9 07:55:19 2024 +0530

    [Feature][Seatunnel-web]Add support to configure placeholder with default 
value in the job config. (#208)
---
 .../app/domain/request/job/JobExecParam.java       |   6 +-
 .../app/service/impl/JobExecutorServiceImpl.java   |   2 +-
 .../app/service/impl/JobInstanceServiceImpl.java   |  14 +--
 .../org/apache/seatunnel/app/utils/JobUtils.java   |  74 ++++++-------
 .../apache/seatunnel/app/utils/JobUtilsTests.java  | 120 +++++++++++++++++++++
 .../server/common/SeatunnelErrorEnum.java          |   4 +-
 .../app/controller/JobControllerWrapper.java       |  15 ---
 .../seatunnel/app/test/JobControllerTest.java      |  45 +++-----
 .../app/test/JobExecutorControllerTest.java        |  77 +++----------
 .../seatunnel/app/utils/JobTestingUtils.java       |  41 +++++++
 .../resources/jobs/fake_source_console_job.json    |  52 ++++++++-
 .../src/test/resources/logback-spring.xml          |   2 +-
 12 files changed, 276 insertions(+), 176 deletions(-)

diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/JobExecParam.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/JobExecParam.java
index 223429cc..3aa54ecd 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/JobExecParam.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/JobExecParam.java
@@ -27,10 +27,8 @@ import java.util.Map;
 @AllArgsConstructor
 // Job execution parameters
 public class JobExecParam {
-    // job name -> key -> value
-    private Map<String, String> env;
-    // task name -> key -> value
-    private Map<String, Map<String, String>> tasks;
+    // job config placeholder name -> value
+    private Map<String, String> placeholderValues;
     // task name -> new datasource id
     private Map<String, String> datasource;
 }
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
index ec1b37de..185222eb 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
@@ -80,7 +80,7 @@ public class JobExecutorServiceImpl implements 
IJobExecutorService {
             return Result.success(executeResource.getJobInstanceId());
         } catch (RuntimeException e) {
             Result<Long> failure =
-                    
Result.failure(SeatunnelErrorEnum.JUB_EXEC_SUBMISSION_ERROR, e.getMessage());
+                    
Result.failure(SeatunnelErrorEnum.JOB_EXEC_SUBMISSION_ERROR, e.getMessage());
             // Even though job execution submission failed, we still need to 
return the
             // jobInstanceId to the user
             // as the job instance has been created in the database.
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
index d58d19b4..70b1201e 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
@@ -174,7 +174,6 @@ public class JobInstanceServiceImpl extends 
SeatunnelBaseServiceImpl
         BusinessMode businessMode =
                 
BusinessMode.valueOf(jobDefinitionDao.getJob(jobId).getJobType());
         Config envConfig = filterEmptyValue(ConfigFactory.parseString(envStr));
-        envConfig = JobUtils.updateEnvConfig(executeParam, envConfig);
         JobUtils.updateDataSource(executeParam, tasks);
 
         Map<String, List<Config>> sourceMap = new LinkedHashMap<>();
@@ -230,8 +229,6 @@ public class JobInstanceServiceImpl extends 
SeatunnelBaseServiceImpl
                                                         
ParsingMode.SHARDING.name()));
                             }
 
-                            config =
-                                    JobUtils.updateTaskConfig(executeParam, 
config, task.getName());
                             Config mergeConfig =
                                     mergeTaskConfig(
                                             task,
@@ -240,9 +237,6 @@ public class JobInstanceServiceImpl extends 
SeatunnelBaseServiceImpl
                                             businessMode,
                                             config,
                                             optionRule);
-                            mergeConfig =
-                                    JobUtils.updateQueryTaskConfig(
-                                            executeParam, mergeConfig, 
task.getName());
                             sourceMap
                                     .get(task.getConnectorType())
                                     .add(filterEmptyValue(mergeConfig));
@@ -272,9 +266,6 @@ public class JobInstanceServiceImpl extends 
SeatunnelBaseServiceImpl
                         }
                         List<TableSchemaReq> inputSchemas = 
findInputSchemas(tasks, lines, task);
                         Config transformConfig = buildTransformConfig(task, 
config, inputSchemas);
-                        transformConfig =
-                                JobUtils.updateTaskConfig(
-                                        executeParam, transformConfig, 
task.getName());
                         transformMap
                                 .get(task.getConnectorType())
                                 .add(filterEmptyValue(transformConfig));
@@ -289,8 +280,6 @@ public class JobInstanceServiceImpl extends 
SeatunnelBaseServiceImpl
                             if (!sinkMap.containsKey(task.getConnectorType())) 
{
                                 sinkMap.put(task.getConnectorType(), new 
ArrayList<>());
                             }
-                            config =
-                                    JobUtils.updateTaskConfig(executeParam, 
config, task.getName());
                             Config mergeConfig =
                                     mergeTaskConfig(
                                             task,
@@ -341,7 +330,8 @@ public class JobInstanceServiceImpl extends 
SeatunnelBaseServiceImpl
                                         .setJson(false)
                                         .setComments(false)
                                         .setOriginComments(false));
-        return SeaTunnelConfigUtil.generateConfig(env, sources, transforms, 
sinks);
+        String jobConfig = SeaTunnelConfigUtil.generateConfig(env, sources, 
transforms, sinks);
+        return JobUtils.replaceJobConfigPlaceholders(jobConfig, executeParam);
     }
 
     @Override
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java
index bfa36830..b51db986 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java
@@ -16,20 +16,23 @@
  */
 package org.apache.seatunnel.app.utils;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
-
 import org.apache.seatunnel.app.dal.entity.JobTask;
 import org.apache.seatunnel.app.domain.request.job.JobExecParam;
 import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
+import org.apache.seatunnel.server.common.SeatunnelException;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 public class JobUtils {
 
     // The maximum length of the job execution error message, 4KB
     private static final int ERROR_MESSAGE_MAX_LENGTH = 4096;
+    private static final Pattern placeholderPattern = 
Pattern.compile("\\$\\{(\\w+)(?::(.*?))?\\}");
 
     public static String getJobInstanceErrorMessage(String message) {
         if (message == null) {
@@ -40,46 +43,6 @@ public class JobUtils {
                 : message;
     }
 
-    public static Config updateEnvConfig(JobExecParam jobExecParam, Config 
envConfig) {
-        if (jobExecParam == null || jobExecParam.getEnv() == null) {
-            return envConfig;
-        }
-        return updateConfig(envConfig, jobExecParam.getEnv());
-    }
-
-    private static Config updateConfig(Config config, Map<String, String> 
properties) {
-        for (Map.Entry<String, String> entry : properties.entrySet()) {
-            config =
-                    config.withValue(
-                            entry.getKey(), 
ConfigValueFactory.fromAnyRef(entry.getValue()));
-        }
-        return config;
-    }
-
-    public static Config updateTaskConfig(
-            JobExecParam jobExecParam, Config taskConfig, String taskName) {
-        if (jobExecParam == null
-                || jobExecParam.getTasks() == null
-                || jobExecParam.getTasks().get(taskName) == null) {
-            return taskConfig;
-        }
-        return updateConfig(taskConfig, jobExecParam.getTasks().get(taskName));
-    }
-
-    public static Config updateQueryTaskConfig(
-            JobExecParam jobExecParam, Config taskConfig, String taskName) {
-        if (jobExecParam == null
-                || jobExecParam.getTasks() == null
-                || jobExecParam.getTasks().get(taskName) == null) {
-            return taskConfig;
-        }
-        String query = jobExecParam.getTasks().get(taskName).get("query");
-        if (query != null) {
-            return taskConfig.withValue("query", 
ConfigValueFactory.fromAnyRef(query));
-        }
-        return taskConfig;
-    }
-
     public static void updateDataSource(JobExecParam jobExecParam, 
List<JobTask> tasks) {
         if (jobExecParam == null || jobExecParam.getDatasource() == null) {
             return;
@@ -104,4 +67,29 @@ public class JobUtils {
                 || JobStatus.CANCELED == jobStatus
                 || JobStatus.FAILED == jobStatus;
     }
+
+    // Replace placeholders in job config with actual values
+    public static String replaceJobConfigPlaceholders(
+            String jobConfigString, JobExecParam jobExecParam) {
+        Map<String, String> placeholderValues =
+                (jobExecParam != null && jobExecParam.getPlaceholderValues() 
!= null)
+                        ? jobExecParam.getPlaceholderValues()
+                        : Collections.emptyMap();
+
+        Matcher matcher = placeholderPattern.matcher(jobConfigString);
+        StringBuffer result = new StringBuffer();
+
+        while (matcher.find()) {
+            String placeholderName = matcher.group(1);
+            String replacement = 
placeholderValues.getOrDefault(placeholderName, matcher.group(2));
+            if (replacement == null) {
+                throw new SeatunnelException(
+                        SeatunnelErrorEnum.JOB_NO_VALUE_FOUND_FOR_PLACEHOLDER, 
placeholderName);
+            }
+            matcher.appendReplacement(result, replacement);
+        }
+
+        matcher.appendTail(result);
+        return result.toString();
+    }
 }
diff --git 
a/seatunnel-server/seatunnel-app/src/test/java/org/apache/seatunnel/app/utils/JobUtilsTests.java
 
b/seatunnel-server/seatunnel-app/src/test/java/org/apache/seatunnel/app/utils/JobUtilsTests.java
new file mode 100644
index 00000000..31c37fb9
--- /dev/null
+++ 
b/seatunnel-server/seatunnel-app/src/test/java/org/apache/seatunnel/app/utils/JobUtilsTests.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.utils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.seatunnel.app.domain.request.job.JobExecParam;
+import org.apache.seatunnel.server.common.SeatunnelException;
+
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class JobUtilsTests {
+
+    @Test
+    public void 
testReplaceJobConfigPlaceholders_AllJobConfigPlaceholdersReplaced() {
+        String jobConfigContent =
+                
"job.mode=${jobModeParam:BATCH}\ncheckpoint.interval=30\njob.name=${jobNameParam}";
+        Map<String, String> paramValues = new HashMap<>();
+        paramValues.put("jobModeParam", "STREAMING");
+        paramValues.put("jobNameParam", "newJob");
+        JobExecParam jobExecParam = getJobExecParam(paramValues);
+
+        String expected = 
"job.mode=STREAMING\ncheckpoint.interval=30\njob.name=newJob";
+        String actual = 
JobUtils.replaceJobConfigPlaceholders(jobConfigContent, jobExecParam);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void 
testReplaceJobConfigPlaceholders_JobConfig_PlaceholdersRepeat() {
+        String jobConfigContent =
+                
"job.mode=${jobModeParam:BATCH}\ncheckpoint.interval=30\njob.name=${jobModeParam}";
+        Map<String, String> paramValues = new HashMap<>();
+        paramValues.put("jobModeParam", "STREAMING");
+        JobExecParam jobExecParam = getJobExecParam(paramValues);
+
+        String expected = 
"job.mode=STREAMING\ncheckpoint.interval=30\njob.name=STREAMING";
+        String actual = 
JobUtils.replaceJobConfigPlaceholders(jobConfigContent, jobExecParam);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testReplaceJobConfigPlaceholdersUsed() {
+        String jobConfigContent =
+                
"job.mode=${jobModeParam:BATCH}\ncheckpoint.interval=30\njob.name=${jobNameParam:DefaultJob}";
+        Map<String, String> paramValues = new HashMap<>();
+        paramValues.put("jobModeParam", "STREAMING");
+        JobExecParam jobExecParam = getJobExecParam(paramValues);
+
+        String expected = 
"job.mode=STREAMING\ncheckpoint.interval=30\njob.name=DefaultJob";
+        String actual = 
JobUtils.replaceJobConfigPlaceholders(jobConfigContent, jobExecParam);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void 
testReplaceJobConfigPlaceholders_NoDefaultValueThrowsException() {
+        String jobConfigContent =
+                
"job.mode=${jobModeParam}\ncheckpoint.interval=30\njob.name=${jobNameParam}";
+        Map<String, String> paramValues = new HashMap<>();
+        paramValues.put("jobModeParam", "STREAMING");
+        JobExecParam jobExecParam = getJobExecParam(paramValues);
+
+        assertThrows(
+                SeatunnelException.class,
+                () -> {
+                    JobUtils.replaceJobConfigPlaceholders(jobConfigContent, 
jobExecParam);
+                });
+    }
+
+    @Test
+    public void testReplaceJobConfigPlaceholders_NoJobConfigPlaceholders() {
+        String jobConfigContent = 
"job.mode=STREAMING\ncheckpoint.interval=30\njob.name=newJob";
+        Map<String, String> paramValues = new HashMap<>();
+        JobExecParam jobExecParam = getJobExecParam(paramValues);
+
+        String expected = 
"job.mode=STREAMING\ncheckpoint.interval=30\njob.name=newJob";
+        String actual = 
JobUtils.replaceJobConfigPlaceholders(jobConfigContent, jobExecParam);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testParseConfigWithPlaceHolders() {
+        String transformConfig =
+                
"{\"log.print.data\":\"true\",\"log.print.delay.ms\":\"${logPrintDelayMs:100}\"}";
+        Config config = ConfigFactory.parseString(transformConfig);
+        assertNotNull(config);
+    }
+
+    private static @NotNull JobExecParam getJobExecParam(Map<String, String> 
paramValues) {
+        JobExecParam jobExecParam = new JobExecParam();
+        jobExecParam.setPlaceholderValues(paramValues);
+        return jobExecParam;
+    }
+}
diff --git 
a/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java
 
b/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java
index dc70cb27..53dde9ad 100644
--- 
a/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java
+++ 
b/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java
@@ -82,9 +82,11 @@ public enum SeatunnelErrorEnum {
             "load job state from engine error",
             "load job statue from engine [%s] error, error msg is [%s]"),
     UNSUPPORTED_ENGINE(40003, "unsupported engine", "unsupported engine [%s] 
version [%s]"),
-    JUB_EXEC_SUBMISSION_ERROR(40004, "Job execution submission error.", "%s"),
+    JOB_EXEC_SUBMISSION_ERROR(40004, "Job execution submission error.", "%s"),
     LOAD_ENGINE_METRICS_ERROR(
             40005, "load engine metrics error", "load engine metrics error. 
error msg is [%s]"),
+    JOB_NO_VALUE_FOUND_FOR_PLACEHOLDER(
+            40006, "No value found for placeholder", "No value found for 
placeholder: [%s]"),
 
     JOB_RUN_GENERATE_UUID_ERROR(50001, "generate uuid error", "generate uuid 
error"),
     /* datasource and virtual table */
diff --git 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobControllerWrapper.java
 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobControllerWrapper.java
index b899835f..be654fba 100644
--- 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobControllerWrapper.java
+++ 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobControllerWrapper.java
@@ -25,10 +25,6 @@ import org.apache.seatunnel.app.utils.JSONUtils;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-
 public class JobControllerWrapper extends SeatunnelWebTestingBase {
 
     public Result<Long> createJob(JobCreateReq jobCreateRequest) {
@@ -48,15 +44,4 @@ public class JobControllerWrapper extends 
SeatunnelWebTestingBase {
         String response = sendRequest(urlWithParam("job/get/" + jobVersionId + 
"?"), null, "GET");
         return JSONTestUtils.parseObject(response, new 
TypeReference<Result<JobRes>>() {});
     }
-
-    public JobCreateReq populateJobCreateReqFromFile() {
-        String filePath = 
"src/test/resources/jobs/fake_source_console_job.json";
-        String jsonContent;
-        try {
-            jsonContent = new String(Files.readAllBytes(Paths.get(filePath)));
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-        return JSONTestUtils.parseObject(jsonContent, JobCreateReq.class);
-    }
 }
diff --git 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java
 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java
index c963324c..6b51a94c 100644
--- 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java
+++ 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java
@@ -20,7 +20,6 @@ import org.apache.seatunnel.app.common.Result;
 import org.apache.seatunnel.app.common.SeaTunnelWebCluster;
 import org.apache.seatunnel.app.controller.JobControllerWrapper;
 import org.apache.seatunnel.app.controller.JobExecutorControllerWrapper;
-import 
org.apache.seatunnel.app.controller.SeatunnelDatasourceControllerWrapper;
 import org.apache.seatunnel.app.domain.request.job.JobConfig;
 import org.apache.seatunnel.app.domain.request.job.JobCreateReq;
 import org.apache.seatunnel.app.domain.request.job.JobDAG;
@@ -45,7 +44,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class JobControllerTest {
     private static final SeaTunnelWebCluster seaTunnelWebCluster = new 
SeaTunnelWebCluster();
-    private static SeatunnelDatasourceControllerWrapper 
seatunnelDatasourceControllerWrapper;
     private static JobControllerWrapper jobControllerWrapper;
     private static JobExecutorControllerWrapper jobExecutorControllerWrapper;
     private static final String uniqueId = "_" + System.currentTimeMillis();
@@ -53,7 +51,6 @@ public class JobControllerTest {
     @BeforeAll
     public static void setUp() {
         seaTunnelWebCluster.start();
-        seatunnelDatasourceControllerWrapper = new 
SeatunnelDatasourceControllerWrapper();
         jobControllerWrapper = new JobControllerWrapper();
         jobExecutorControllerWrapper = new JobExecutorControllerWrapper();
     }
@@ -61,11 +58,9 @@ public class JobControllerTest {
     @Test
     public void createJobWithSingleAPI_shouldExecuteSuccessfully() {
         String jobName = "jobWithSingleAPI" + uniqueId;
-        JobCreateReq jobCreateReq = 
jobControllerWrapper.populateJobCreateReqFromFile();
-        jobCreateReq.getJobConfig().setName(jobName);
-        jobCreateReq.getJobConfig().setDescription(jobName + " description");
-        setSourceIds(jobCreateReq, "fake_source_create" + uniqueId, 
"console_create" + uniqueId);
-
+        JobCreateReq jobCreateReq =
+                JobTestingUtils.populateJobCreateReqFromFile(
+                        jobName, "fake_source_create" + uniqueId, 
"console_create" + uniqueId);
         Result<Long> job = jobControllerWrapper.createJob(jobCreateReq);
         assertTrue(job.isSuccess());
         Result<Long> result = 
jobExecutorControllerWrapper.jobExecutor(job.getData());
@@ -79,25 +74,12 @@ public class JobControllerTest {
         assertEquals(5, listResult.getData().get(0).getWriteRowCount());
     }
 
-    private void setSourceIds(
-            JobCreateReq jobCreateReq, String fsdSourceName, String 
csSourceName) {
-        // Set the data source id for the plugin configs
-        String fakeSourceDataSourceId =
-                
seatunnelDatasourceControllerWrapper.createFakeSourceDatasource(fsdSourceName);
-        String consoleDataSourceId =
-                
seatunnelDatasourceControllerWrapper.createConsoleDatasource(csSourceName);
-        for (PluginConfig pluginConfig : jobCreateReq.getPluginConfigs()) {
-            if (pluginConfig.getName().equals("source-fake-source")) {
-                
pluginConfig.setDataSourceId(Long.parseLong(fakeSourceDataSourceId));
-            } else if (pluginConfig.getName().equals("sink-console")) {
-                
pluginConfig.setDataSourceId(Long.parseLong(consoleDataSourceId));
-            }
-        }
-    }
-
     @Test
     public void createJobWithSingleAPI_ValidateInput() {
-        JobCreateReq jobCreateReq = 
jobControllerWrapper.populateJobCreateReqFromFile();
+        String jobName = "jobWithSingleAPI2" + uniqueId;
+        JobCreateReq jobCreateReq =
+                JobTestingUtils.populateJobCreateReqFromFile(
+                        jobName, "fake_source_create_2" + uniqueId, 
"console_create_2" + uniqueId);
         JobConfig jobConfig = jobCreateReq.getJobConfig();
         jobConfig.setName("");
         Result<Long> result = jobControllerWrapper.createJob(jobCreateReq);
@@ -105,7 +87,7 @@ public class JobControllerTest {
         assertEquals(SeatunnelErrorEnum.PARAM_CAN_NOT_BE_NULL.getCode(), 
result.getCode());
         assertEquals("param [name] can not be null or empty", result.getMsg());
 
-        String jobName = "jobValidation" + uniqueId;
+        jobName = "jobValidation" + uniqueId;
         jobConfig.setName(jobName);
         jobConfig.setDescription(null);
         result = jobControllerWrapper.createJob(jobCreateReq);
@@ -129,7 +111,8 @@ public class JobControllerTest {
                 result.getMsg());
 
         jobConfig.getEnv().put("job.mode", "BATCH");
-        setSourceIds(jobCreateReq, "fake_source_create2" + uniqueId, 
"console_create2" + uniqueId);
+        // setSourceIds(jobCreateReq, "fake_source_create2" + uniqueId, 
"console_create2" +
+        // uniqueId);
         result = jobControllerWrapper.createJob(jobCreateReq);
         assertTrue(result.isSuccess());
         assertEquals(0, result.getCode());
@@ -139,11 +122,9 @@ public class JobControllerTest {
     @Test
     public void testUpdateJob_ForValidAndInvalidScenarios() {
         String jobName = "updateJob_single_api" + uniqueId;
-        JobCreateReq jobCreateReq = 
jobControllerWrapper.populateJobCreateReqFromFile();
-        jobCreateReq.getJobConfig().setName(jobName);
-        jobCreateReq.getJobConfig().setDescription(jobName + " description");
-        setSourceIds(
-                jobCreateReq, "fake_source_update_job" + uniqueId, 
"console_update_job" + uniqueId);
+        JobCreateReq jobCreateReq =
+                JobTestingUtils.populateJobCreateReqFromFile(
+                        jobName, "fake_source_create_3" + uniqueId, 
"console_create_3" + uniqueId);
 
         Result<Long> job = jobControllerWrapper.createJob(jobCreateReq);
         assertTrue(job.isSuccess());
diff --git 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
index 6e1cb1ea..63488d11 100644
--- 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
+++ 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
@@ -81,7 +81,10 @@ public class JobExecutorControllerTest {
     @Test
     public void executeJobWithParameters() {
         String jobName = "execJobWithParam" + uniqueId;
-        long jobVersionId = JobTestingUtils.createJob(jobName);
+        JobCreateReq jobCreateReq =
+                JobTestingUtils.populateJobCreateReqFromFile(
+                        jobName, "fake_source_exec-1" + uniqueId, 
"console_exec-1" + uniqueId);
+        long jobVersionId = JobTestingUtils.createJob(jobCreateReq);
         Result<Long> result = 
jobExecutorControllerWrapper.jobExecutor(jobVersionId);
         assertTrue(result.isSuccess());
         assertTrue(result.getData() > 0);
@@ -96,26 +99,19 @@ public class JobExecutorControllerTest {
         
assertTrue(generatedJobFile.contains("\"log.print.delay.ms\"=\"100\""));
 
         JobExecParam jobExecParam = new JobExecParam();
+        Map<String, String> placeholderValues = new HashMap<>();
 
-        Map<String, String> envConf = new HashMap<>();
-        envConf.put("job.name", "executeJobWithParameters");
-        jobExecParam.setEnv(envConf);
-        Map<String, Map<String, String>> tasks = new HashMap<>();
-        jobExecParam.setTasks(tasks);
-
+        // source configuration
         int numberOfRecords = 100;
-        Map<String, String> task1Config = new HashMap<>();
-        task1Config.put("row.num", String.valueOf(numberOfRecords));
-        tasks.put("source-fakesource", task1Config);
+        placeholderValues.put("rowNum", String.valueOf(numberOfRecords));
 
-        Map<String, String> task2Config = new HashMap<>();
-        task2Config.put("replace_first", "true");
-        task2Config.put("is_regex", "true");
-        tasks.put("transform-replace", task2Config);
+        // transform configuration
+        placeholderValues.put("firstReplace", "true");
+        placeholderValues.put("isRegex", "true");
 
-        Map<String, String> task3Config = new HashMap<>();
-        task3Config.put("log.print.delay.ms", "99");
-        tasks.put("sink-console", task3Config);
+        // sink configuration
+        placeholderValues.put("logPrintDelayMs", "99");
+        jobExecParam.setPlaceholderValues(placeholderValues);
 
         result = jobExecutorControllerWrapper.jobExecutor(jobVersionId, 
jobExecParam);
         assertTrue(result.isSuccess());
@@ -132,53 +128,6 @@ public class JobExecutorControllerTest {
         assertTrue(generatedJobFile.contains("\"replace_first\"=\"true\""));
         // database properties except query can not be updated
         
assertFalse(generatedJobFile.contains("\"log.print.delay.ms\"=\"99\""));
-        
assertTrue(generatedJobFile.contains("\"job.name\"=executeJobWithParameters"));
-    }
-
-    @Test
-    public void executeJobWithParameters_AllowQueryUpdate() {
-        String jobName = "execJobUpdateQuery" + uniqueId;
-        JobCreateReq jobCreateReq = 
JobTestingUtils.populateMySQLJobCreateReqFromFile();
-        jobCreateReq.getJobConfig().setName(jobName);
-        jobCreateReq.getJobConfig().setDescription(jobName + " description");
-        String datasourceName = "execJobUpdateQuery_db" + uniqueId;
-        String mysqlDatasourceId =
-                
seatunnelDatasourceControllerWrapper.createMysqlDatasource(datasourceName);
-        for (PluginConfig pluginConfig : jobCreateReq.getPluginConfigs()) {
-            pluginConfig.setDataSourceId(Long.parseLong(mysqlDatasourceId));
-        }
-        Result<Long> job = jobControllerWrapper.createJob(jobCreateReq);
-        assertTrue(job.isSuccess());
-        Long jobVersionId = job.getData();
-        Result<Long> result = 
jobExecutorControllerWrapper.jobExecutor(jobVersionId);
-        // Fails because of the wrong credentials of the database.
-        assertFalse(result.isSuccess());
-        String generatedJobFile = 
getGenerateJobFile(String.valueOf(jobVersionId));
-        assertTrue(
-                generatedJobFile.contains(
-                        "query=\"SELECT `name`, `age` FROM 
`test`.`test_table`\""));
-        assertTrue(generatedJobFile.contains("user=someUser"));
-        assertTrue(generatedJobFile.contains("password=somePassword"));
-
-        JobExecParam jobExecParam = new JobExecParam();
-        Map<String, Map<String, String>> tasks = new HashMap<>();
-        jobExecParam.setTasks(tasks);
-
-        Map<String, String> task1Config = new HashMap<>();
-        task1Config.put("query", "SELECT `name`, `age` FROM 
`test`.`test_table` LIMIT 10");
-        task1Config.put("user", "otherUser");
-        task1Config.put("password", "otherPassword");
-        tasks.put("mysql_source_1", task1Config);
-
-        result = jobExecutorControllerWrapper.jobExecutor(jobVersionId, 
jobExecParam);
-        assertFalse(result.isSuccess());
-        // query should be changed but other database details should not be 
changed,
-        generatedJobFile = getGenerateJobFile(String.valueOf(jobVersionId));
-        assertTrue(
-                generatedJobFile.contains(
-                        "query=\"SELECT `name`, `age` FROM `test`.`test_table` 
LIMIT 10\""));
-        assertTrue(generatedJobFile.contains("user=someUser"));
-        assertTrue(generatedJobFile.contains("password=somePassword"));
     }
 
     @Test
diff --git 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java
 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java
index f4d6c525..e9dc309f 100644
--- 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java
+++ 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java
@@ -18,6 +18,7 @@ package org.apache.seatunnel.app.utils;
 
 import org.apache.seatunnel.app.common.Result;
 import org.apache.seatunnel.app.controller.JobConfigControllerWrapper;
+import org.apache.seatunnel.app.controller.JobControllerWrapper;
 import org.apache.seatunnel.app.controller.JobDefinitionControllerWrapper;
 import org.apache.seatunnel.app.controller.JobMetricsControllerWrapper;
 import org.apache.seatunnel.app.controller.JobTaskControllerWrapper;
@@ -26,6 +27,7 @@ import org.apache.seatunnel.app.domain.request.job.Edge;
 import org.apache.seatunnel.app.domain.request.job.JobConfig;
 import org.apache.seatunnel.app.domain.request.job.JobCreateReq;
 import org.apache.seatunnel.app.domain.request.job.JobDAG;
+import org.apache.seatunnel.app.domain.request.job.PluginConfig;
 import org.apache.seatunnel.app.domain.response.job.JobTaskCheckRes;
 import 
org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
 
@@ -49,6 +51,7 @@ public class JobTestingUtils {
             new JobTaskControllerWrapper();
     private static SeatunnelDatasourceControllerWrapper 
seatunnelDatasourceControllerWrapper =
             new SeatunnelDatasourceControllerWrapper();
+    private static JobControllerWrapper jobControllerWrapper = new 
JobControllerWrapper();
     private static final long TIMEOUT = 60; // 1 minute
     private static final long INTERVAL = 2; // 1 second
 
@@ -166,4 +169,42 @@ public class JobTestingUtils {
         }
         return JSONTestUtils.parseObject(jsonContent, JobCreateReq.class);
     }
+
+    public static JobCreateReq populateJobCreateReqFromFile(
+            String jobName, String fsdSourceName, String csSourceName) {
+        String filePath = 
"src/test/resources/jobs/fake_source_console_job.json";
+        String jsonContent;
+        try {
+            jsonContent = new String(Files.readAllBytes(Paths.get(filePath)));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        JobCreateReq jobCreateReq = JSONTestUtils.parseObject(jsonContent, 
JobCreateReq.class);
+        jobCreateReq.getJobConfig().setName(jobName);
+        jobCreateReq.getJobConfig().setDescription(jobName + " description");
+        setSourceIds(jobCreateReq, fsdSourceName, csSourceName);
+        return jobCreateReq;
+    }
+
+    private static void setSourceIds(
+            JobCreateReq jobCreateReq, String fsdSourceName, String 
csSourceName) {
+        // Set the data source id for the plugin configs
+        String fakeSourceDataSourceId =
+                
seatunnelDatasourceControllerWrapper.createFakeSourceDatasource(fsdSourceName);
+        String consoleDataSourceId =
+                
seatunnelDatasourceControllerWrapper.createConsoleDatasource(csSourceName);
+        for (PluginConfig pluginConfig : jobCreateReq.getPluginConfigs()) {
+            if (pluginConfig.getName().equals("source-fake-source")) {
+                
pluginConfig.setDataSourceId(Long.parseLong(fakeSourceDataSourceId));
+            } else if (pluginConfig.getName().equals("sink-console")) {
+                
pluginConfig.setDataSourceId(Long.parseLong(consoleDataSourceId));
+            }
+        }
+    }
+
+    public static Long createJob(JobCreateReq jobCreateReq) {
+        Result<Long> jobCreateResult = 
jobControllerWrapper.createJob(jobCreateReq);
+        assertTrue(jobCreateResult.isSuccess());
+        return jobCreateResult.getData();
+    }
 }
diff --git 
a/seatunnel-web-it/src/test/resources/jobs/fake_source_console_job.json 
b/seatunnel-web-it/src/test/resources/jobs/fake_source_console_job.json
index acab2f66..324c1d9b 100644
--- a/seatunnel-web-it/src/test/resources/jobs/fake_source_console_job.json
+++ b/seatunnel-web-it/src/test/resources/jobs/fake_source_console_job.json
@@ -5,7 +5,7 @@
     "engine": "SeaTunnel",
     "env": {
       "job.mode": "BATCH",
-      "job.name": "SeaTunnel_Job",
+      "job.name": "${jonName:SeaTunnel_Job}",
       "jars": "",
       "checkpoint.interval": "",
       "checkpoint.timeout": "",
@@ -36,7 +36,7 @@
       },
       "dataSourceId": 1,
       "sceneMode": "SINGLE_TABLE",
-      "config": "{\"query\":\"\",\"tables_configs\":\"\",\"schema\":\"fields 
{\\n        name = \\\"string\\\"\\n        age = \\\"int\\\"\\n      
}\",\"string.fake.mode\":\"RANGE\",\"string.template\":\"\",\"tinyint.fake.mode\":\"RANGE\",\"tinyint.template\":\"\",\"smallint.fake.mode\":\"RANGE\",\"smallint.template\":\"\",\"int.fake.mode\":\"RANGE\",\"int.template\":\"\",\"bigint.fake.mode\":\"RANGE\",\"bigint.template\":\"\",\"float.fake.mode\":\"RANGE\",\"float.template\":\"\",\"doubl
 [...]
+      "config": "{\"query\":\"\",\"tables_configs\":\"\",\"schema\":\"fields 
{\\n        name = \\\"string\\\"\\n        age = \\\"int\\\"\\n      
}\",\"string.fake.mode\":\"RANGE\",\"string.template\":\"\",\"tinyint.fake.mode\":\"RANGE\",\"tinyint.template\":\"\",\"smallint.fake.mode\":\"RANGE\",\"smallint.template\":\"\",\"int.fake.mode\":\"RANGE\",\"int.template\":\"\",\"bigint.fake.mode\":\"RANGE\",\"bigint.template\":\"\",\"float.fake.mode\":\"RANGE\",\"float.template\":\"\",\"doubl
 [...]
       "outputSchema": [
         {
           "fields": [
@@ -69,6 +69,48 @@
       ],
       "transformOptions": {}
     },
+    {
+      "name": "transform-replace",
+      "type": "TRANSFORM",
+      "connectorType": "Replace",
+      "selectTableFields": {
+        "tableFields": [],
+        "all": false
+      },
+      "sceneMode": "SINGLE_TABLE",
+      "config": 
"{\"query\":\"\",\"replace_field\":\"name\",\"pattern\":\"OK\",\"replacement\":\"ITS
 
OK.\",\"is_regex\":\"${isRegex:false}\",\"replace_first\":\"${firstReplace:false}\"}",
+      "outputSchema": [
+        {
+          "fields": [
+            {
+              "type": "string",
+              "name": "name",
+              "comment": "",
+              "primaryKey": true,
+              "defaultValue": null,
+              "nullable": false,
+              "properties": null,
+              "unSupport": false,
+              "outputDataType": "STRING"
+            },
+            {
+              "type": "int",
+              "name": "age",
+              "comment": "",
+              "primaryKey": false,
+              "defaultValue": null,
+              "nullable": false,
+              "properties": null,
+              "unSupport": false,
+              "outputDataType": "INT"
+            }
+          ],
+          "tableName": "fake_table",
+          "database": "fake_database"
+        }
+      ],
+      "transformOptions": {}
+    },
     {
       "name": "sink-console",
       "type": "SINK",
@@ -86,7 +128,7 @@
         "all": false
       },
       "dataSourceId": 2,
-      "config": "{\"log.print.data\":\"true\",\"log.print.delay.ms\":\"100\"}",
+      "config": 
"{\"log.print.data\":\"true\",\"log.print.delay.ms\":\"${logPrintDelayMs:100}\"}",
       "transformOptions": {}
     }
   ],
@@ -94,6 +136,10 @@
     "edges": [
       {
         "inputPluginId": "source-fake-source",
+        "targetPluginId": "transform-replace"
+      },
+      {
+        "inputPluginId": "transform-replace",
         "targetPluginId": "sink-console"
       }
     ]
diff --git a/seatunnel-web-it/src/test/resources/logback-spring.xml 
b/seatunnel-web-it/src/test/resources/logback-spring.xml
index 145a239d..74ccf4cd 100644
--- a/seatunnel-web-it/src/test/resources/logback-spring.xml
+++ b/seatunnel-web-it/src/test/resources/logback-spring.xml
@@ -41,7 +41,7 @@
         </encoder>
     </appender>
 
-    <root level="INFO">
+    <root level="DEBUG">
         <appender-ref ref="seatunnel-web" />
         <appender-ref ref="console" />
     </root>


Reply via email to