This is an automated email from the ASF dual-hosted git repository. fanjia pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git
The following commit(s) were added to refs/heads/main by this push: new 70e526a2 [Feature][Seatunnel-web]Add support to configure placeholder with default value in the job config. (#208) 70e526a2 is described below commit 70e526a2d38a4fe1243dc0e9d74edeb4f3826941 Author: Mohammad Arshad <ars...@apache.org> AuthorDate: Mon Sep 9 07:55:19 2024 +0530 [Feature][Seatunnel-web]Add support to configure placeholder with default value in the job config. (#208) --- .../app/domain/request/job/JobExecParam.java | 6 +- .../app/service/impl/JobExecutorServiceImpl.java | 2 +- .../app/service/impl/JobInstanceServiceImpl.java | 14 +-- .../org/apache/seatunnel/app/utils/JobUtils.java | 74 ++++++------- .../apache/seatunnel/app/utils/JobUtilsTests.java | 120 +++++++++++++++++++++ .../server/common/SeatunnelErrorEnum.java | 4 +- .../app/controller/JobControllerWrapper.java | 15 --- .../seatunnel/app/test/JobControllerTest.java | 45 +++----- .../app/test/JobExecutorControllerTest.java | 77 +++---------- .../seatunnel/app/utils/JobTestingUtils.java | 41 +++++++ .../resources/jobs/fake_source_console_job.json | 52 ++++++++- .../src/test/resources/logback-spring.xml | 2 +- 12 files changed, 276 insertions(+), 176 deletions(-) diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/JobExecParam.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/JobExecParam.java index 223429cc..3aa54ecd 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/JobExecParam.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/JobExecParam.java @@ -27,10 +27,8 @@ import java.util.Map; @AllArgsConstructor // Job execution parameters public class JobExecParam { - // job name -> key -> value - private Map<String, String> env; - // task name -> key -> value - private Map<String, Map<String, String>> tasks; + // job config placeholder name -> value + private Map<String, String> placeholderValues; // task name -> new datasource id private Map<String, String> datasource; } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java index ec1b37de..185222eb 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java @@ -80,7 +80,7 @@ public class JobExecutorServiceImpl implements IJobExecutorService { return Result.success(executeResource.getJobInstanceId()); } catch (RuntimeException e) { Result<Long> failure = - Result.failure(SeatunnelErrorEnum.JUB_EXEC_SUBMISSION_ERROR, e.getMessage()); + Result.failure(SeatunnelErrorEnum.JOB_EXEC_SUBMISSION_ERROR, e.getMessage()); // Even though job execution submission failed, we still need to return the // jobInstanceId to the user // as the job instance has been created in the database. diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java index d58d19b4..70b1201e 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java @@ -174,7 +174,6 @@ public class JobInstanceServiceImpl extends SeatunnelBaseServiceImpl BusinessMode businessMode = BusinessMode.valueOf(jobDefinitionDao.getJob(jobId).getJobType()); Config envConfig = filterEmptyValue(ConfigFactory.parseString(envStr)); - envConfig = JobUtils.updateEnvConfig(executeParam, envConfig); JobUtils.updateDataSource(executeParam, tasks); Map<String, List<Config>> sourceMap = new LinkedHashMap<>(); @@ -230,8 +229,6 @@ public class JobInstanceServiceImpl extends SeatunnelBaseServiceImpl ParsingMode.SHARDING.name())); } - config = - JobUtils.updateTaskConfig(executeParam, config, task.getName()); Config mergeConfig = mergeTaskConfig( task, @@ -240,9 +237,6 @@ public class JobInstanceServiceImpl extends SeatunnelBaseServiceImpl businessMode, config, optionRule); - mergeConfig = - JobUtils.updateQueryTaskConfig( - executeParam, mergeConfig, task.getName()); sourceMap .get(task.getConnectorType()) .add(filterEmptyValue(mergeConfig)); @@ -272,9 +266,6 @@ public class JobInstanceServiceImpl extends SeatunnelBaseServiceImpl } List<TableSchemaReq> inputSchemas = findInputSchemas(tasks, lines, task); Config transformConfig = buildTransformConfig(task, config, inputSchemas); - transformConfig = - JobUtils.updateTaskConfig( - executeParam, transformConfig, task.getName()); transformMap .get(task.getConnectorType()) .add(filterEmptyValue(transformConfig)); @@ -289,8 +280,6 @@ public class JobInstanceServiceImpl extends SeatunnelBaseServiceImpl if (!sinkMap.containsKey(task.getConnectorType())) { sinkMap.put(task.getConnectorType(), new ArrayList<>()); } - config = - JobUtils.updateTaskConfig(executeParam, config, task.getName()); Config mergeConfig = mergeTaskConfig( task, @@ -341,7 +330,8 @@ public class JobInstanceServiceImpl extends SeatunnelBaseServiceImpl .setJson(false) .setComments(false) .setOriginComments(false)); - return SeaTunnelConfigUtil.generateConfig(env, sources, transforms, sinks); + String jobConfig = SeaTunnelConfigUtil.generateConfig(env, sources, transforms, sinks); + return JobUtils.replaceJobConfigPlaceholders(jobConfig, executeParam); } @Override diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java index bfa36830..b51db986 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java @@ -16,20 +16,23 @@ */ package org.apache.seatunnel.app.utils; -import org.apache.seatunnel.shade.com.typesafe.config.Config; -import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory; - import org.apache.seatunnel.app.dal.entity.JobTask; import org.apache.seatunnel.app.domain.request.job.JobExecParam; import org.apache.seatunnel.engine.core.job.JobStatus; +import org.apache.seatunnel.server.common.SeatunnelErrorEnum; +import org.apache.seatunnel.server.common.SeatunnelException; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; public class JobUtils { // The maximum length of the job execution error message, 4KB private static final int ERROR_MESSAGE_MAX_LENGTH = 4096; + private static final Pattern placeholderPattern = Pattern.compile("\\$\\{(\\w+)(?::(.*?))?\\}"); public static String getJobInstanceErrorMessage(String message) { if (message == null) { @@ -40,46 +43,6 @@ public class JobUtils { : message; } - public static Config updateEnvConfig(JobExecParam jobExecParam, Config envConfig) { - if (jobExecParam == null || jobExecParam.getEnv() == null) { - return envConfig; - } - return updateConfig(envConfig, jobExecParam.getEnv()); - } - - private static Config updateConfig(Config config, Map<String, String> properties) { - for (Map.Entry<String, String> entry : properties.entrySet()) { - config = - config.withValue( - entry.getKey(), ConfigValueFactory.fromAnyRef(entry.getValue())); - } - return config; - } - - public static Config updateTaskConfig( - JobExecParam jobExecParam, Config taskConfig, String taskName) { - if (jobExecParam == null - || jobExecParam.getTasks() == null - || jobExecParam.getTasks().get(taskName) == null) { - return taskConfig; - } - return updateConfig(taskConfig, jobExecParam.getTasks().get(taskName)); - } - - public static Config updateQueryTaskConfig( - JobExecParam jobExecParam, Config taskConfig, String taskName) { - if (jobExecParam == null - || jobExecParam.getTasks() == null - || jobExecParam.getTasks().get(taskName) == null) { - return taskConfig; - } - String query = jobExecParam.getTasks().get(taskName).get("query"); - if (query != null) { - return taskConfig.withValue("query", ConfigValueFactory.fromAnyRef(query)); - } - return taskConfig; - } - public static void updateDataSource(JobExecParam jobExecParam, List<JobTask> tasks) { if (jobExecParam == null || jobExecParam.getDatasource() == null) { return; @@ -104,4 +67,29 @@ public class JobUtils { || JobStatus.CANCELED == jobStatus || JobStatus.FAILED == jobStatus; } + + // Replace placeholders in job config with actual values + public static String replaceJobConfigPlaceholders( + String jobConfigString, JobExecParam jobExecParam) { + Map<String, String> placeholderValues = + (jobExecParam != null && jobExecParam.getPlaceholderValues() != null) + ? jobExecParam.getPlaceholderValues() + : Collections.emptyMap(); + + Matcher matcher = placeholderPattern.matcher(jobConfigString); + StringBuffer result = new StringBuffer(); + + while (matcher.find()) { + String placeholderName = matcher.group(1); + String replacement = placeholderValues.getOrDefault(placeholderName, matcher.group(2)); + if (replacement == null) { + throw new SeatunnelException( + SeatunnelErrorEnum.JOB_NO_VALUE_FOUND_FOR_PLACEHOLDER, placeholderName); + } + matcher.appendReplacement(result, replacement); + } + + matcher.appendTail(result); + return result.toString(); + } } diff --git a/seatunnel-server/seatunnel-app/src/test/java/org/apache/seatunnel/app/utils/JobUtilsTests.java b/seatunnel-server/seatunnel-app/src/test/java/org/apache/seatunnel/app/utils/JobUtilsTests.java new file mode 100644 index 00000000..31c37fb9 --- /dev/null +++ b/seatunnel-server/seatunnel-app/src/test/java/org/apache/seatunnel/app/utils/JobUtilsTests.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.app.utils; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; + +import org.apache.seatunnel.app.domain.request.job.JobExecParam; +import org.apache.seatunnel.server.common.SeatunnelException; + +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class JobUtilsTests { + + @Test + public void testReplaceJobConfigPlaceholders_AllJobConfigPlaceholdersReplaced() { + String jobConfigContent = + "job.mode=${jobModeParam:BATCH}\ncheckpoint.interval=30\njob.name=${jobNameParam}"; + Map<String, String> paramValues = new HashMap<>(); + paramValues.put("jobModeParam", "STREAMING"); + paramValues.put("jobNameParam", "newJob"); + JobExecParam jobExecParam = getJobExecParam(paramValues); + + String expected = "job.mode=STREAMING\ncheckpoint.interval=30\njob.name=newJob"; + String actual = JobUtils.replaceJobConfigPlaceholders(jobConfigContent, jobExecParam); + + assertEquals(expected, actual); + } + + @Test + public void testReplaceJobConfigPlaceholders_JobConfig_PlaceholdersRepeat() { + String jobConfigContent = + "job.mode=${jobModeParam:BATCH}\ncheckpoint.interval=30\njob.name=${jobModeParam}"; + Map<String, String> paramValues = new HashMap<>(); + paramValues.put("jobModeParam", "STREAMING"); + JobExecParam jobExecParam = getJobExecParam(paramValues); + + String expected = "job.mode=STREAMING\ncheckpoint.interval=30\njob.name=STREAMING"; + String actual = JobUtils.replaceJobConfigPlaceholders(jobConfigContent, jobExecParam); + + assertEquals(expected, actual); + } + + @Test + public void testReplaceJobConfigPlaceholdersUsed() { + String jobConfigContent = + "job.mode=${jobModeParam:BATCH}\ncheckpoint.interval=30\njob.name=${jobNameParam:DefaultJob}"; + Map<String, String> paramValues = new HashMap<>(); + paramValues.put("jobModeParam", "STREAMING"); + JobExecParam jobExecParam = getJobExecParam(paramValues); + + String expected = "job.mode=STREAMING\ncheckpoint.interval=30\njob.name=DefaultJob"; + String actual = JobUtils.replaceJobConfigPlaceholders(jobConfigContent, jobExecParam); + + assertEquals(expected, actual); + } + + @Test + public void testReplaceJobConfigPlaceholders_NoDefaultValueThrowsException() { + String jobConfigContent = + "job.mode=${jobModeParam}\ncheckpoint.interval=30\njob.name=${jobNameParam}"; + Map<String, String> paramValues = new HashMap<>(); + paramValues.put("jobModeParam", "STREAMING"); + JobExecParam jobExecParam = getJobExecParam(paramValues); + + assertThrows( + SeatunnelException.class, + () -> { + JobUtils.replaceJobConfigPlaceholders(jobConfigContent, jobExecParam); + }); + } + + @Test + public void testReplaceJobConfigPlaceholders_NoJobConfigPlaceholders() { + String jobConfigContent = "job.mode=STREAMING\ncheckpoint.interval=30\njob.name=newJob"; + Map<String, String> paramValues = new HashMap<>(); + JobExecParam jobExecParam = getJobExecParam(paramValues); + + String expected = "job.mode=STREAMING\ncheckpoint.interval=30\njob.name=newJob"; + String actual = JobUtils.replaceJobConfigPlaceholders(jobConfigContent, jobExecParam); + + assertEquals(expected, actual); + } + + @Test + public void testParseConfigWithPlaceHolders() { + String transformConfig = + "{\"log.print.data\":\"true\",\"log.print.delay.ms\":\"${logPrintDelayMs:100}\"}"; + Config config = ConfigFactory.parseString(transformConfig); + assertNotNull(config); + } + + private static @NotNull JobExecParam getJobExecParam(Map<String, String> paramValues) { + JobExecParam jobExecParam = new JobExecParam(); + jobExecParam.setPlaceholderValues(paramValues); + return jobExecParam; + } +} diff --git a/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java b/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java index dc70cb27..53dde9ad 100644 --- a/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java +++ b/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java @@ -82,9 +82,11 @@ public enum SeatunnelErrorEnum { "load job state from engine error", "load job statue from engine [%s] error, error msg is [%s]"), UNSUPPORTED_ENGINE(40003, "unsupported engine", "unsupported engine [%s] version [%s]"), - JUB_EXEC_SUBMISSION_ERROR(40004, "Job execution submission error.", "%s"), + JOB_EXEC_SUBMISSION_ERROR(40004, "Job execution submission error.", "%s"), LOAD_ENGINE_METRICS_ERROR( 40005, "load engine metrics error", "load engine metrics error. error msg is [%s]"), + JOB_NO_VALUE_FOUND_FOR_PLACEHOLDER( + 40006, "No value found for placeholder", "No value found for placeholder: [%s]"), JOB_RUN_GENERATE_UUID_ERROR(50001, "generate uuid error", "generate uuid error"), /* datasource and virtual table */ diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobControllerWrapper.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobControllerWrapper.java index b899835f..be654fba 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobControllerWrapper.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobControllerWrapper.java @@ -25,10 +25,6 @@ import org.apache.seatunnel.app.utils.JSONUtils; import com.fasterxml.jackson.core.type.TypeReference; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Paths; - public class JobControllerWrapper extends SeatunnelWebTestingBase { public Result<Long> createJob(JobCreateReq jobCreateRequest) { @@ -48,15 +44,4 @@ public class JobControllerWrapper extends SeatunnelWebTestingBase { String response = sendRequest(urlWithParam("job/get/" + jobVersionId + "?"), null, "GET"); return JSONTestUtils.parseObject(response, new TypeReference<Result<JobRes>>() {}); } - - public JobCreateReq populateJobCreateReqFromFile() { - String filePath = "src/test/resources/jobs/fake_source_console_job.json"; - String jsonContent; - try { - jsonContent = new String(Files.readAllBytes(Paths.get(filePath))); - } catch (IOException e) { - throw new RuntimeException(e); - } - return JSONTestUtils.parseObject(jsonContent, JobCreateReq.class); - } } diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java index c963324c..6b51a94c 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java @@ -20,7 +20,6 @@ import org.apache.seatunnel.app.common.Result; import org.apache.seatunnel.app.common.SeaTunnelWebCluster; import org.apache.seatunnel.app.controller.JobControllerWrapper; import org.apache.seatunnel.app.controller.JobExecutorControllerWrapper; -import org.apache.seatunnel.app.controller.SeatunnelDatasourceControllerWrapper; import org.apache.seatunnel.app.domain.request.job.JobConfig; import org.apache.seatunnel.app.domain.request.job.JobCreateReq; import org.apache.seatunnel.app.domain.request.job.JobDAG; @@ -45,7 +44,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class JobControllerTest { private static final SeaTunnelWebCluster seaTunnelWebCluster = new SeaTunnelWebCluster(); - private static SeatunnelDatasourceControllerWrapper seatunnelDatasourceControllerWrapper; private static JobControllerWrapper jobControllerWrapper; private static JobExecutorControllerWrapper jobExecutorControllerWrapper; private static final String uniqueId = "_" + System.currentTimeMillis(); @@ -53,7 +51,6 @@ public class JobControllerTest { @BeforeAll public static void setUp() { seaTunnelWebCluster.start(); - seatunnelDatasourceControllerWrapper = new SeatunnelDatasourceControllerWrapper(); jobControllerWrapper = new JobControllerWrapper(); jobExecutorControllerWrapper = new JobExecutorControllerWrapper(); } @@ -61,11 +58,9 @@ public class JobControllerTest { @Test public void createJobWithSingleAPI_shouldExecuteSuccessfully() { String jobName = "jobWithSingleAPI" + uniqueId; - JobCreateReq jobCreateReq = jobControllerWrapper.populateJobCreateReqFromFile(); - jobCreateReq.getJobConfig().setName(jobName); - jobCreateReq.getJobConfig().setDescription(jobName + " description"); - setSourceIds(jobCreateReq, "fake_source_create" + uniqueId, "console_create" + uniqueId); - + JobCreateReq jobCreateReq = + JobTestingUtils.populateJobCreateReqFromFile( + jobName, "fake_source_create" + uniqueId, "console_create" + uniqueId); Result<Long> job = jobControllerWrapper.createJob(jobCreateReq); assertTrue(job.isSuccess()); Result<Long> result = jobExecutorControllerWrapper.jobExecutor(job.getData()); @@ -79,25 +74,12 @@ public class JobControllerTest { assertEquals(5, listResult.getData().get(0).getWriteRowCount()); } - private void setSourceIds( - JobCreateReq jobCreateReq, String fsdSourceName, String csSourceName) { - // Set the data source id for the plugin configs - String fakeSourceDataSourceId = - seatunnelDatasourceControllerWrapper.createFakeSourceDatasource(fsdSourceName); - String consoleDataSourceId = - seatunnelDatasourceControllerWrapper.createConsoleDatasource(csSourceName); - for (PluginConfig pluginConfig : jobCreateReq.getPluginConfigs()) { - if (pluginConfig.getName().equals("source-fake-source")) { - pluginConfig.setDataSourceId(Long.parseLong(fakeSourceDataSourceId)); - } else if (pluginConfig.getName().equals("sink-console")) { - pluginConfig.setDataSourceId(Long.parseLong(consoleDataSourceId)); - } - } - } - @Test public void createJobWithSingleAPI_ValidateInput() { - JobCreateReq jobCreateReq = jobControllerWrapper.populateJobCreateReqFromFile(); + String jobName = "jobWithSingleAPI2" + uniqueId; + JobCreateReq jobCreateReq = + JobTestingUtils.populateJobCreateReqFromFile( + jobName, "fake_source_create_2" + uniqueId, "console_create_2" + uniqueId); JobConfig jobConfig = jobCreateReq.getJobConfig(); jobConfig.setName(""); Result<Long> result = jobControllerWrapper.createJob(jobCreateReq); @@ -105,7 +87,7 @@ public class JobControllerTest { assertEquals(SeatunnelErrorEnum.PARAM_CAN_NOT_BE_NULL.getCode(), result.getCode()); assertEquals("param [name] can not be null or empty", result.getMsg()); - String jobName = "jobValidation" + uniqueId; + jobName = "jobValidation" + uniqueId; jobConfig.setName(jobName); jobConfig.setDescription(null); result = jobControllerWrapper.createJob(jobCreateReq); @@ -129,7 +111,8 @@ public class JobControllerTest { result.getMsg()); jobConfig.getEnv().put("job.mode", "BATCH"); - setSourceIds(jobCreateReq, "fake_source_create2" + uniqueId, "console_create2" + uniqueId); + // setSourceIds(jobCreateReq, "fake_source_create2" + uniqueId, "console_create2" + + // uniqueId); result = jobControllerWrapper.createJob(jobCreateReq); assertTrue(result.isSuccess()); assertEquals(0, result.getCode()); @@ -139,11 +122,9 @@ public class JobControllerTest { @Test public void testUpdateJob_ForValidAndInvalidScenarios() { String jobName = "updateJob_single_api" + uniqueId; - JobCreateReq jobCreateReq = jobControllerWrapper.populateJobCreateReqFromFile(); - jobCreateReq.getJobConfig().setName(jobName); - jobCreateReq.getJobConfig().setDescription(jobName + " description"); - setSourceIds( - jobCreateReq, "fake_source_update_job" + uniqueId, "console_update_job" + uniqueId); + JobCreateReq jobCreateReq = + JobTestingUtils.populateJobCreateReqFromFile( + jobName, "fake_source_create_3" + uniqueId, "console_create_3" + uniqueId); Result<Long> job = jobControllerWrapper.createJob(jobCreateReq); assertTrue(job.isSuccess()); diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java index 6e1cb1ea..63488d11 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java @@ -81,7 +81,10 @@ public class JobExecutorControllerTest { @Test public void executeJobWithParameters() { String jobName = "execJobWithParam" + uniqueId; - long jobVersionId = JobTestingUtils.createJob(jobName); + JobCreateReq jobCreateReq = + JobTestingUtils.populateJobCreateReqFromFile( + jobName, "fake_source_exec-1" + uniqueId, "console_exec-1" + uniqueId); + long jobVersionId = JobTestingUtils.createJob(jobCreateReq); Result<Long> result = jobExecutorControllerWrapper.jobExecutor(jobVersionId); assertTrue(result.isSuccess()); assertTrue(result.getData() > 0); @@ -96,26 +99,19 @@ public class JobExecutorControllerTest { assertTrue(generatedJobFile.contains("\"log.print.delay.ms\"=\"100\"")); JobExecParam jobExecParam = new JobExecParam(); + Map<String, String> placeholderValues = new HashMap<>(); - Map<String, String> envConf = new HashMap<>(); - envConf.put("job.name", "executeJobWithParameters"); - jobExecParam.setEnv(envConf); - Map<String, Map<String, String>> tasks = new HashMap<>(); - jobExecParam.setTasks(tasks); - + // source configuration int numberOfRecords = 100; - Map<String, String> task1Config = new HashMap<>(); - task1Config.put("row.num", String.valueOf(numberOfRecords)); - tasks.put("source-fakesource", task1Config); + placeholderValues.put("rowNum", String.valueOf(numberOfRecords)); - Map<String, String> task2Config = new HashMap<>(); - task2Config.put("replace_first", "true"); - task2Config.put("is_regex", "true"); - tasks.put("transform-replace", task2Config); + // transform configuration + placeholderValues.put("firstReplace", "true"); + placeholderValues.put("isRegex", "true"); - Map<String, String> task3Config = new HashMap<>(); - task3Config.put("log.print.delay.ms", "99"); - tasks.put("sink-console", task3Config); + // sink configuration + placeholderValues.put("logPrintDelayMs", "99"); + jobExecParam.setPlaceholderValues(placeholderValues); result = jobExecutorControllerWrapper.jobExecutor(jobVersionId, jobExecParam); assertTrue(result.isSuccess()); @@ -132,53 +128,6 @@ public class JobExecutorControllerTest { assertTrue(generatedJobFile.contains("\"replace_first\"=\"true\"")); // database properties except query can not be updated assertFalse(generatedJobFile.contains("\"log.print.delay.ms\"=\"99\"")); - assertTrue(generatedJobFile.contains("\"job.name\"=executeJobWithParameters")); - } - - @Test - public void executeJobWithParameters_AllowQueryUpdate() { - String jobName = "execJobUpdateQuery" + uniqueId; - JobCreateReq jobCreateReq = JobTestingUtils.populateMySQLJobCreateReqFromFile(); - jobCreateReq.getJobConfig().setName(jobName); - jobCreateReq.getJobConfig().setDescription(jobName + " description"); - String datasourceName = "execJobUpdateQuery_db" + uniqueId; - String mysqlDatasourceId = - seatunnelDatasourceControllerWrapper.createMysqlDatasource(datasourceName); - for (PluginConfig pluginConfig : jobCreateReq.getPluginConfigs()) { - pluginConfig.setDataSourceId(Long.parseLong(mysqlDatasourceId)); - } - Result<Long> job = jobControllerWrapper.createJob(jobCreateReq); - assertTrue(job.isSuccess()); - Long jobVersionId = job.getData(); - Result<Long> result = jobExecutorControllerWrapper.jobExecutor(jobVersionId); - // Fails because of the wrong credentials of the database. - assertFalse(result.isSuccess()); - String generatedJobFile = getGenerateJobFile(String.valueOf(jobVersionId)); - assertTrue( - generatedJobFile.contains( - "query=\"SELECT `name`, `age` FROM `test`.`test_table`\"")); - assertTrue(generatedJobFile.contains("user=someUser")); - assertTrue(generatedJobFile.contains("password=somePassword")); - - JobExecParam jobExecParam = new JobExecParam(); - Map<String, Map<String, String>> tasks = new HashMap<>(); - jobExecParam.setTasks(tasks); - - Map<String, String> task1Config = new HashMap<>(); - task1Config.put("query", "SELECT `name`, `age` FROM `test`.`test_table` LIMIT 10"); - task1Config.put("user", "otherUser"); - task1Config.put("password", "otherPassword"); - tasks.put("mysql_source_1", task1Config); - - result = jobExecutorControllerWrapper.jobExecutor(jobVersionId, jobExecParam); - assertFalse(result.isSuccess()); - // query should be changed but other database details should not be changed, - generatedJobFile = getGenerateJobFile(String.valueOf(jobVersionId)); - assertTrue( - generatedJobFile.contains( - "query=\"SELECT `name`, `age` FROM `test`.`test_table` LIMIT 10\"")); - assertTrue(generatedJobFile.contains("user=someUser")); - assertTrue(generatedJobFile.contains("password=somePassword")); } @Test diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java index f4d6c525..e9dc309f 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.app.utils; import org.apache.seatunnel.app.common.Result; import org.apache.seatunnel.app.controller.JobConfigControllerWrapper; +import org.apache.seatunnel.app.controller.JobControllerWrapper; import org.apache.seatunnel.app.controller.JobDefinitionControllerWrapper; import org.apache.seatunnel.app.controller.JobMetricsControllerWrapper; import org.apache.seatunnel.app.controller.JobTaskControllerWrapper; @@ -26,6 +27,7 @@ import org.apache.seatunnel.app.domain.request.job.Edge; import org.apache.seatunnel.app.domain.request.job.JobConfig; import org.apache.seatunnel.app.domain.request.job.JobCreateReq; import org.apache.seatunnel.app.domain.request.job.JobDAG; +import org.apache.seatunnel.app.domain.request.job.PluginConfig; import org.apache.seatunnel.app.domain.response.job.JobTaskCheckRes; import org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes; @@ -49,6 +51,7 @@ public class JobTestingUtils { new JobTaskControllerWrapper(); private static SeatunnelDatasourceControllerWrapper seatunnelDatasourceControllerWrapper = new SeatunnelDatasourceControllerWrapper(); + private static JobControllerWrapper jobControllerWrapper = new JobControllerWrapper(); private static final long TIMEOUT = 60; // 1 minute private static final long INTERVAL = 2; // 1 second @@ -166,4 +169,42 @@ public class JobTestingUtils { } return JSONTestUtils.parseObject(jsonContent, JobCreateReq.class); } + + public static JobCreateReq populateJobCreateReqFromFile( + String jobName, String fsdSourceName, String csSourceName) { + String filePath = "src/test/resources/jobs/fake_source_console_job.json"; + String jsonContent; + try { + jsonContent = new String(Files.readAllBytes(Paths.get(filePath))); + } catch (IOException e) { + throw new RuntimeException(e); + } + JobCreateReq jobCreateReq = JSONTestUtils.parseObject(jsonContent, JobCreateReq.class); + jobCreateReq.getJobConfig().setName(jobName); + jobCreateReq.getJobConfig().setDescription(jobName + " description"); + setSourceIds(jobCreateReq, fsdSourceName, csSourceName); + return jobCreateReq; + } + + private static void setSourceIds( + JobCreateReq jobCreateReq, String fsdSourceName, String csSourceName) { + // Set the data source id for the plugin configs + String fakeSourceDataSourceId = + seatunnelDatasourceControllerWrapper.createFakeSourceDatasource(fsdSourceName); + String consoleDataSourceId = + seatunnelDatasourceControllerWrapper.createConsoleDatasource(csSourceName); + for (PluginConfig pluginConfig : jobCreateReq.getPluginConfigs()) { + if (pluginConfig.getName().equals("source-fake-source")) { + pluginConfig.setDataSourceId(Long.parseLong(fakeSourceDataSourceId)); + } else if (pluginConfig.getName().equals("sink-console")) { + pluginConfig.setDataSourceId(Long.parseLong(consoleDataSourceId)); + } + } + } + + public static Long createJob(JobCreateReq jobCreateReq) { + Result<Long> jobCreateResult = jobControllerWrapper.createJob(jobCreateReq); + assertTrue(jobCreateResult.isSuccess()); + return jobCreateResult.getData(); + } } diff --git a/seatunnel-web-it/src/test/resources/jobs/fake_source_console_job.json b/seatunnel-web-it/src/test/resources/jobs/fake_source_console_job.json index acab2f66..324c1d9b 100644 --- a/seatunnel-web-it/src/test/resources/jobs/fake_source_console_job.json +++ b/seatunnel-web-it/src/test/resources/jobs/fake_source_console_job.json @@ -5,7 +5,7 @@ "engine": "SeaTunnel", "env": { "job.mode": "BATCH", - "job.name": "SeaTunnel_Job", + "job.name": "${jonName:SeaTunnel_Job}", "jars": "", "checkpoint.interval": "", "checkpoint.timeout": "", @@ -36,7 +36,7 @@ }, "dataSourceId": 1, "sceneMode": "SINGLE_TABLE", - "config": "{\"query\":\"\",\"tables_configs\":\"\",\"schema\":\"fields {\\n name = \\\"string\\\"\\n age = \\\"int\\\"\\n }\",\"string.fake.mode\":\"RANGE\",\"string.template\":\"\",\"tinyint.fake.mode\":\"RANGE\",\"tinyint.template\":\"\",\"smallint.fake.mode\":\"RANGE\",\"smallint.template\":\"\",\"int.fake.mode\":\"RANGE\",\"int.template\":\"\",\"bigint.fake.mode\":\"RANGE\",\"bigint.template\":\"\",\"float.fake.mode\":\"RANGE\",\"float.template\":\"\",\"doubl [...] + "config": "{\"query\":\"\",\"tables_configs\":\"\",\"schema\":\"fields {\\n name = \\\"string\\\"\\n age = \\\"int\\\"\\n }\",\"string.fake.mode\":\"RANGE\",\"string.template\":\"\",\"tinyint.fake.mode\":\"RANGE\",\"tinyint.template\":\"\",\"smallint.fake.mode\":\"RANGE\",\"smallint.template\":\"\",\"int.fake.mode\":\"RANGE\",\"int.template\":\"\",\"bigint.fake.mode\":\"RANGE\",\"bigint.template\":\"\",\"float.fake.mode\":\"RANGE\",\"float.template\":\"\",\"doubl [...] "outputSchema": [ { "fields": [ @@ -69,6 +69,48 @@ ], "transformOptions": {} }, + { + "name": "transform-replace", + "type": "TRANSFORM", + "connectorType": "Replace", + "selectTableFields": { + "tableFields": [], + "all": false + }, + "sceneMode": "SINGLE_TABLE", + "config": "{\"query\":\"\",\"replace_field\":\"name\",\"pattern\":\"OK\",\"replacement\":\"ITS OK.\",\"is_regex\":\"${isRegex:false}\",\"replace_first\":\"${firstReplace:false}\"}", + "outputSchema": [ + { + "fields": [ + { + "type": "string", + "name": "name", + "comment": "", + "primaryKey": true, + "defaultValue": null, + "nullable": false, + "properties": null, + "unSupport": false, + "outputDataType": "STRING" + }, + { + "type": "int", + "name": "age", + "comment": "", + "primaryKey": false, + "defaultValue": null, + "nullable": false, + "properties": null, + "unSupport": false, + "outputDataType": "INT" + } + ], + "tableName": "fake_table", + "database": "fake_database" + } + ], + "transformOptions": {} + }, { "name": "sink-console", "type": "SINK", @@ -86,7 +128,7 @@ "all": false }, "dataSourceId": 2, - "config": "{\"log.print.data\":\"true\",\"log.print.delay.ms\":\"100\"}", + "config": "{\"log.print.data\":\"true\",\"log.print.delay.ms\":\"${logPrintDelayMs:100}\"}", "transformOptions": {} } ], @@ -94,6 +136,10 @@ "edges": [ { "inputPluginId": "source-fake-source", + "targetPluginId": "transform-replace" + }, + { + "inputPluginId": "transform-replace", "targetPluginId": "sink-console" } ] diff --git a/seatunnel-web-it/src/test/resources/logback-spring.xml b/seatunnel-web-it/src/test/resources/logback-spring.xml index 145a239d..74ccf4cd 100644 --- a/seatunnel-web-it/src/test/resources/logback-spring.xml +++ b/seatunnel-web-it/src/test/resources/logback-spring.xml @@ -41,7 +41,7 @@ </encoder> </appender> - <root level="INFO"> + <root level="DEBUG"> <appender-ref ref="seatunnel-web" /> <appender-ref ref="console" /> </root>