This is an automated email from the ASF dual-hosted git repository. fanjia pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git
The following commit(s) were added to refs/heads/main by this push: new 30ed42fd [Improvement] [Seatunnel-web] Add API to get and update job definition. (#198) 30ed42fd is described below commit 30ed42fdac21a0edc6a9cb06ed00f81967b64717 Author: Mohammad Arshad <ars...@apache.org> AuthorDate: Fri Aug 30 08:19:56 2024 +0530 [Improvement] [Seatunnel-web] Add API to get and update job definition. (#198) --- .../seatunnel/app/controller/JobController.java | 27 ++++++ .../seatunnel/app/domain/request/job/JobDAG.java | 4 + .../job/JobDAG.java => response/job/JobRes.java} | 15 +++- .../apache/seatunnel/app/service/IJobService.java | 6 ++ .../app/service/impl/JobConfigServiceImpl.java | 8 ++ .../seatunnel/app/service/impl/JobServiceImpl.java | 23 +++++ .../app/service/impl/TaskInstanceServiceImpl.java | 8 +- .../server/common/SeatunnelErrorEnum.java | 1 + .../app/controller/JobControllerWrapper.java | 13 +++ .../seatunnel/app/test/JobControllerTest.java | 97 +++++++++++++++++++++- 10 files changed, 194 insertions(+), 8 deletions(-) diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobController.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobController.java index a6d941ea..b7b70ba7 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobController.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobController.java @@ -18,9 +18,13 @@ package org.apache.seatunnel.app.controller; import org.apache.seatunnel.app.common.Result; import org.apache.seatunnel.app.domain.request.job.JobCreateReq; +import org.apache.seatunnel.app.domain.response.job.JobRes; import org.apache.seatunnel.app.service.IJobService; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.PutMapping; import org.springframework.web.bind.annotation.RequestAttribute; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; @@ -49,4 +53,27 @@ public class JobController { throws JsonProcessingException { return Result.success(jobCRUDService.createJob(userId, jobCreateRequest)); } + + @PutMapping("/update/{jobVersionId}") + @ApiOperation( + value = "Update a job, all the existing ids should be passed in the request.", + httpMethod = "PUT") + public Result<Void> updateJob( + @ApiParam(value = "userId", required = true) @RequestAttribute("userId") Integer userId, + @ApiParam(value = "jobVersionId", required = true) @PathVariable long jobVersionId, + @RequestBody JobCreateReq jobCreateReq) + throws JsonProcessingException { + jobCRUDService.updateJob(userId, jobVersionId, jobCreateReq); + return Result.success(); + } + + @GetMapping("/get/{jobVersionId}") + @ApiOperation(value = "Get a job detail.", httpMethod = "GET") + public Result<JobRes> getJob( + @ApiParam(value = "userId", required = true) @RequestAttribute("userId") Integer userId, + @ApiParam(value = "jobVersionId", required = true) @PathVariable long jobVersionId) + throws JsonProcessingException { + JobRes jobRes = jobCRUDService.getJob(userId, jobVersionId); + return Result.success(jobRes); + } } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/JobDAG.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/JobDAG.java index 944ea793..30fb8a18 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/JobDAG.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/JobDAG.java @@ -17,11 +17,15 @@ package org.apache.seatunnel.app.domain.request.job; +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; import java.util.List; @Data +@NoArgsConstructor +@AllArgsConstructor public class JobDAG { private List<Edge> edges; diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/JobDAG.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/job/JobRes.java similarity index 67% copy from seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/JobDAG.java copy to seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/job/JobRes.java index 944ea793..87142d0e 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/JobDAG.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/job/JobRes.java @@ -14,15 +14,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.seatunnel.app.domain.response.job; -package org.apache.seatunnel.app.domain.request.job; +import org.apache.seatunnel.app.domain.request.job.JobDAG; +import org.apache.seatunnel.app.domain.request.job.PluginConfig; +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; import java.util.List; @Data -public class JobDAG { - - private List<Edge> edges; +@NoArgsConstructor +@AllArgsConstructor +public class JobRes { + private JobConfigRes jobConfig; + private List<PluginConfig> pluginConfigs; + private JobDAG jobDAG; } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobService.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobService.java index 7faa4a79..e8e5904b 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobService.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobService.java @@ -17,10 +17,16 @@ package org.apache.seatunnel.app.service; import org.apache.seatunnel.app.domain.request.job.JobCreateReq; +import org.apache.seatunnel.app.domain.response.job.JobRes; import com.fasterxml.jackson.core.JsonProcessingException; public interface IJobService { long createJob(int userId, JobCreateReq jobCreateRequest) throws JsonProcessingException; + + void updateJob(Integer userId, long jobId, JobCreateReq jobCreateReq) + throws JsonProcessingException; + + JobRes getJob(Integer userId, long jobId) throws JsonProcessingException; } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobConfigServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobConfigServiceImpl.java index eb6e7c6d..0ec04f53 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobConfigServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobConfigServiceImpl.java @@ -57,6 +57,10 @@ public class JobConfigServiceImpl extends SeatunnelBaseServiceImpl implements IJ public JobConfigRes getJobConfig(long jobVersionId) throws JsonProcessingException { funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_CONFIG_DETAIL, 0); JobVersion jobVersion = jobVersionDao.getVersionById(jobVersionId); + if (jobVersion == null) { + throw new SeatunnelException( + SeatunnelErrorEnum.RESOURCE_NOT_FOUND, "job version not found."); + } JobDefinition jobDefinition = jobDefinitionDao.getJob(jobVersion.getJobId()); JobConfigRes jobConfigRes = new JobConfigRes(); jobConfigRes.setName(jobDefinition.getName()); @@ -80,6 +84,10 @@ public class JobConfigServiceImpl extends SeatunnelBaseServiceImpl implements IJ throws JsonProcessingException { funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_CONFIG_UPDATE, 0); JobVersion version = jobVersionDao.getVersionById(jobVersionId); + if (version == null) { + throw new SeatunnelException( + SeatunnelErrorEnum.RESOURCE_NOT_FOUND, "job version not found."); + } JobDefinition jobDefinition = new JobDefinition(); jobDefinition.setId(version.getJobId()); jobDefinition.setUpdateUserId(userId); diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobServiceImpl.java index d51835bd..bd54621c 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobServiceImpl.java @@ -23,7 +23,10 @@ import org.apache.seatunnel.app.domain.request.job.JobConfig; import org.apache.seatunnel.app.domain.request.job.JobCreateReq; import org.apache.seatunnel.app.domain.request.job.JobDAG; import org.apache.seatunnel.app.domain.request.job.JobReq; +import org.apache.seatunnel.app.domain.request.job.JobTaskInfo; import org.apache.seatunnel.app.domain.request.job.PluginConfig; +import org.apache.seatunnel.app.domain.response.job.JobConfigRes; +import org.apache.seatunnel.app.domain.response.job.JobRes; import org.apache.seatunnel.app.service.IJobConfigService; import org.apache.seatunnel.app.service.IJobDefinitionService; import org.apache.seatunnel.app.service.IJobService; @@ -108,4 +111,24 @@ public class JobServiceImpl implements IJobService { } return jobReq; } + + @Override + public void updateJob(Integer userId, long jobVersionId, JobCreateReq jobCreateReq) + throws JsonProcessingException { + jobConfigService.updateJobConfig(userId, jobVersionId, jobCreateReq.getJobConfig()); + List<PluginConfig> pluginConfigs = jobCreateReq.getPluginConfigs(); + if (pluginConfigs != null) { + for (PluginConfig pluginConfig : pluginConfigs) { + jobTaskService.saveSingleTask(jobVersionId, pluginConfig); + } + } + jobTaskService.saveJobDAG(jobVersionId, jobCreateReq.getJobDAG()); + } + + @Override + public JobRes getJob(Integer userId, long jobVersionId) throws JsonProcessingException { + JobConfigRes jobConfig = jobConfigService.getJobConfig(jobVersionId); + JobTaskInfo taskConfig = jobTaskService.getTaskConfig(jobVersionId); + return new JobRes(jobConfig, taskConfig.getPlugins(), new JobDAG(taskConfig.getEdges())); + } } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TaskInstanceServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TaskInstanceServiceImpl.java index 0f7a2266..2d2f4d90 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TaskInstanceServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TaskInstanceServiceImpl.java @@ -31,6 +31,8 @@ import org.apache.seatunnel.app.service.IJobDefinitionService; import org.apache.seatunnel.app.service.IJobMetricsService; import org.apache.seatunnel.app.service.ITaskInstanceService; import org.apache.seatunnel.app.utils.PageInfo; +import org.apache.seatunnel.server.common.SeatunnelErrorEnum; +import org.apache.seatunnel.server.common.SeatunnelException; import org.apache.commons.collections4.CollectionUtils; @@ -200,7 +202,8 @@ public class TaskInstanceServiceImpl implements ITaskInstanceService<SeaTunnelJo public Result<JobExecutionStatus> getJobExecutionStatus(Integer userId, long jobInstanceId) { JobInstance jobInstance = jobInstanceDao.getJobExecutionStatus(jobInstanceId); if (jobInstance == null) { - return Result.failure(404, "Job instance not found"); + throw new SeatunnelException( + SeatunnelErrorEnum.RESOURCE_NOT_FOUND, "Job instance not found"); } return Result.success( new JobExecutionStatus(jobInstance.getJobStatus(), jobInstance.getErrorMessage())); @@ -211,7 +214,8 @@ public class TaskInstanceServiceImpl implements ITaskInstanceService<SeaTunnelJo Integer userId, long jobInstanceId) { JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId); if (jobInstance == null) { - return Result.failure(404, "Job instance not found"); + throw new SeatunnelException( + SeatunnelErrorEnum.RESOURCE_NOT_FOUND, "Job instance not found"); } SeaTunnelJobInstanceDto executionDetails = convertToDto(jobInstance); populateExecutionMetricsData( diff --git a/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java b/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java index 5f4b218e..3059eb39 100644 --- a/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java +++ b/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java @@ -129,6 +129,7 @@ public enum SeatunnelErrorEnum { MISSING_PARAM(1777000, "param miss [{0}]", "param miss [{0}]"), PARAM_CAN_NOT_BE_NULL(60018, "", "param [%s] can not be null or empty"), INVALID_PARAM(60019, "", "param [%s] is invalid. %s"), + RESOURCE_NOT_FOUND(404, "", "%s"), ; private final int code; diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobControllerWrapper.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobControllerWrapper.java index 61bc2f5f..b899835f 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobControllerWrapper.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobControllerWrapper.java @@ -19,6 +19,7 @@ package org.apache.seatunnel.app.controller; import org.apache.seatunnel.app.common.Result; import org.apache.seatunnel.app.common.SeatunnelWebTestingBase; import org.apache.seatunnel.app.domain.request.job.JobCreateReq; +import org.apache.seatunnel.app.domain.response.job.JobRes; import org.apache.seatunnel.app.utils.JSONTestUtils; import org.apache.seatunnel.app.utils.JSONUtils; @@ -36,6 +37,18 @@ public class JobControllerWrapper extends SeatunnelWebTestingBase { return JSONTestUtils.parseObject(response, new TypeReference<Result<Long>>() {}); } + public Result<Void> updateJob(long jobVersionId, JobCreateReq jobCreateReq) { + String requestBody = JSONUtils.toPrettyJsonString(jobCreateReq); + String response = + sendRequest(urlWithParam("job/update/" + jobVersionId + "?"), requestBody, "PUT"); + return JSONTestUtils.parseObject(response, new TypeReference<Result<Void>>() {}); + } + + public Result<JobRes> getJob(long jobVersionId) { + String response = sendRequest(urlWithParam("job/get/" + jobVersionId + "?"), null, "GET"); + return JSONTestUtils.parseObject(response, new TypeReference<Result<JobRes>>() {}); + } + public JobCreateReq populateJobCreateReqFromFile() { String filePath = "src/test/resources/jobs/fake_source_console_job.json"; String jsonContent; diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java index 860ce20e..b898ebd9 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java @@ -23,7 +23,10 @@ import org.apache.seatunnel.app.controller.JobExecutorControllerWrapper; import org.apache.seatunnel.app.controller.SeatunnelDatasourceControllerWrapper; import org.apache.seatunnel.app.domain.request.job.JobConfig; import org.apache.seatunnel.app.domain.request.job.JobCreateReq; +import org.apache.seatunnel.app.domain.request.job.JobDAG; import org.apache.seatunnel.app.domain.request.job.PluginConfig; +import org.apache.seatunnel.app.domain.response.job.JobConfigRes; +import org.apache.seatunnel.app.domain.response.job.JobRes; import org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes; import org.apache.seatunnel.app.utils.JobUtils; import org.apache.seatunnel.server.common.SeatunnelErrorEnum; @@ -35,6 +38,7 @@ import org.junit.jupiter.api.Test; import java.util.List; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -59,7 +63,7 @@ public class JobControllerTest { JobCreateReq jobCreateReq = jobControllerWrapper.populateJobCreateReqFromFile(); jobCreateReq.getJobConfig().setName(jobName); jobCreateReq.getJobConfig().setDescription(jobName + " description"); - setSourceIds(jobCreateReq, "fake_source_ds1" + uniqueId, "console_ds1" + uniqueId); + setSourceIds(jobCreateReq, "fake_source_create" + uniqueId, "console_create" + uniqueId); Result<Long> job = jobControllerWrapper.createJob(jobCreateReq); assertTrue(job.isSuccess()); @@ -124,13 +128,102 @@ public class JobControllerTest { result.getMsg()); jobConfig.getEnv().put("job.mode", "BATCH"); - setSourceIds(jobCreateReq, "fake_source_ds2" + uniqueId, "console_ds2" + uniqueId); + setSourceIds(jobCreateReq, "fake_source_create2" + uniqueId, "console_create2" + uniqueId); result = jobControllerWrapper.createJob(jobCreateReq); assertTrue(result.isSuccess()); assertEquals(0, result.getCode()); assertNotNull(result.getData()); } + @Test + public void testUpdateJob_ForValidAndInvalidScenarios() { + String jobName = "updateJob_single_api" + uniqueId; + JobCreateReq jobCreateReq = jobControllerWrapper.populateJobCreateReqFromFile(); + jobCreateReq.getJobConfig().setName(jobName); + jobCreateReq.getJobConfig().setDescription(jobName + " description"); + setSourceIds( + jobCreateReq, "fake_source_update_job" + uniqueId, "console_update_job" + uniqueId); + + Result<Long> job = jobControllerWrapper.createJob(jobCreateReq); + assertTrue(job.isSuccess()); + + Result<JobRes> getJobResponse = jobControllerWrapper.getJob(job.getData()); + assertTrue(getJobResponse.isSuccess()); + JobRes jobRes = getJobResponse.getData(); + assertNotNull(jobRes.getJobConfig()); + assertNotNull(jobRes.getJobConfig()); + assertNotNull(jobRes.getJobDAG()); + + assertEquals(jobName, jobRes.getJobConfig().getName()); + assertEquals( + jobCreateReq.getPluginConfigs().get(0).getName(), + jobRes.getPluginConfigs().get(0).getName()); + assertEquals( + jobCreateReq.getPluginConfigs().get(1).getName(), + jobRes.getPluginConfigs().get(1).getName()); + + JobCreateReq jobUpdateReq = convertJobResToJobCreateReq(jobRes); + String jobName2 = "updateJob_single_api2" + uniqueId; + jobUpdateReq.getJobConfig().setName(jobName2); + jobUpdateReq.getJobConfig().setDescription(jobName2 + " description"); + + Result<Void> jobUpdateResult = jobControllerWrapper.updateJob(job.getData(), jobUpdateReq); + assertTrue(jobUpdateResult.isSuccess()); + + Result<JobRes> getJobResponse2 = jobControllerWrapper.getJob(job.getData()); + assertTrue(getJobResponse2.isSuccess()); + JobRes jobRes2 = getJobResponse2.getData(); + assertEquals(jobName2, jobRes2.getJobConfig().getName()); + assertEquals( + jobUpdateReq.getPluginConfigs().get(0).getName(), + jobRes2.getPluginConfigs().get(0).getName()); + assertEquals( + jobUpdateReq.getPluginConfigs().get(1).getName(), + jobRes2.getPluginConfigs().get(1).getName()); + + // Handle error scenarios + + // Invalid job instance id + Result<JobRes> invalidJobInstanceIdResponse = jobControllerWrapper.getJob(123L); + assertFalse(invalidJobInstanceIdResponse.isSuccess()); + assertEquals( + SeatunnelErrorEnum.RESOURCE_NOT_FOUND.getCode(), + invalidJobInstanceIdResponse.getCode()); + + Result<Void> result = jobControllerWrapper.updateJob(123L, jobUpdateReq); + assertFalse(result.isSuccess()); + assertEquals(SeatunnelErrorEnum.RESOURCE_NOT_FOUND.getCode(), result.getCode()); + + // While doing job update some configuration is wrong. + jobUpdateReq.getJobDAG().getEdges().get(0).setInputPluginId("InvalidInputPluginId"); + jobUpdateReq.getJobDAG().getEdges().get(0).setTargetPluginId("InvalidTargetPluginId"); + jobUpdateResult = jobControllerWrapper.updateJob(job.getData(), jobUpdateReq); + assertFalse(jobUpdateResult.isSuccess()); + assertEquals(SeatunnelErrorEnum.ERROR_CONFIG.getCode(), jobUpdateResult.getCode()); + } + + private JobCreateReq convertJobResToJobCreateReq(JobRes jobRes) { + JobCreateReq jobCreateReq = new JobCreateReq(); + + // Assuming JobRes contains JobConfigRes and List<PluginConfig> and JobDAG + JobConfigRes jobConfigRes = jobRes.getJobConfig(); + List<PluginConfig> pluginConfigs = jobRes.getPluginConfigs(); + JobDAG jobDAG = jobRes.getJobDAG(); + + // Populate JobCreateReq with data from JobRes + JobConfig jobConfig = new JobConfig(); + jobConfig.setName(jobConfigRes.getName()); + jobConfig.setDescription(jobConfigRes.getDescription()); + jobConfig.setEnv(jobConfigRes.getEnv()); + jobConfig.setEngine(jobConfigRes.getEngine()); + + jobCreateReq.setJobConfig(jobConfig); + jobCreateReq.setPluginConfigs(pluginConfigs); + jobCreateReq.setJobDAG(jobDAG); + + return jobCreateReq; + } + @AfterAll public static void tearDown() { seaTunnelWebCluster.stop();