This is an automated email from the ASF dual-hosted git repository. fanjia pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git
The following commit(s) were added to refs/heads/main by this push: new c0880eee [Improvement] [Seatunnel-web] Add API to get job execution status. (#197) c0880eee is described below commit c0880eeefd1b3f2242c3d42b799ddd5f15c5ce47 Author: Mohammad Arshad <ars...@apache.org> AuthorDate: Thu Aug 29 07:33:46 2024 +0530 [Improvement] [Seatunnel-web] Add API to get job execution status. (#197) --- .../app/controller/JobExecutorController.java | 20 ++++++++ .../seatunnel/app/dal/dao/IJobInstanceDao.java | 2 + .../app/dal/dao/impl/JobInstanceDaoImpl.java | 5 ++ .../app/dal/mapper/JobInstanceMapper.java | 2 + .../response/executor/JobExecutionStatus.java} | 25 +++++----- .../app/service/ITaskInstanceService.java | 6 +++ .../app/service/impl/TaskInstanceServiceImpl.java | 56 ++++++++++++++++++++-- .../seatunnel/app/dal/mapper/JobInstanceMapper.xml | 5 ++ .../controller/JobExecutorControllerWrapper.java | 16 +++++++ .../app/test/JobExecutorControllerTest.java | 40 +++++++++++----- 10 files changed, 149 insertions(+), 28 deletions(-) diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobExecutorController.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobExecutorController.java index d330f375..44fe7d95 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobExecutorController.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobExecutorController.java @@ -18,10 +18,13 @@ package org.apache.seatunnel.app.controller; import org.apache.seatunnel.app.common.Result; +import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto; import org.apache.seatunnel.app.domain.request.job.JobExecParam; +import org.apache.seatunnel.app.domain.response.executor.JobExecutionStatus; import org.apache.seatunnel.app.domain.response.executor.JobExecutorRes; import org.apache.seatunnel.app.service.IJobExecutorService; import org.apache.seatunnel.app.service.IJobInstanceService; +import org.apache.seatunnel.app.service.ITaskInstanceService; import org.apache.seatunnel.server.common.SeatunnelErrorEnum; import org.apache.seatunnel.server.common.SeatunnelException; @@ -48,6 +51,7 @@ public class JobExecutorController { @Resource IJobExecutorService jobExecutorService; @Resource private IJobInstanceService jobInstanceService; + @Resource private ITaskInstanceService<SeaTunnelJobInstanceDto> taskInstanceService; @PostMapping("/execute") @ApiOperation(value = "Execute synchronization tasks", httpMethod = "POST") @@ -88,4 +92,20 @@ public class JobExecutorController { @ApiParam(value = "jobInstanceId", required = true) @RequestParam Long jobInstanceId) { return jobExecutorService.jobStore(userId, jobInstanceId); } + + @GetMapping("/status") + @ApiOperation(value = "get job execution status", httpMethod = "GET") + Result<JobExecutionStatus> getJobExecutionStatus( + @ApiParam(value = "userId", required = true) @RequestAttribute("userId") Integer userId, + @ApiParam(value = "jobInstanceId", required = true) @RequestParam Long jobInstanceId) { + return taskInstanceService.getJobExecutionStatus(userId, jobInstanceId); + } + + @GetMapping("/detail") + @ApiOperation(value = "get job execution status and some more details", httpMethod = "GET") + Result<SeaTunnelJobInstanceDto> getJobExecutionDetail( + @ApiParam(value = "userId", required = true) @RequestAttribute("userId") Integer userId, + @ApiParam(value = "jobInstanceId", required = true) @RequestParam Long jobInstanceId) { + return taskInstanceService.getJobExecutionDetail(userId, jobInstanceId); + } } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobInstanceDao.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobInstanceDao.java index f4c5b8f1..44026af8 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobInstanceDao.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobInstanceDao.java @@ -47,4 +47,6 @@ public interface IJobInstanceDao { String jobMode); List<JobInstance> getAllJobInstance(@NonNull List<Long> jobInstanceIdList); + + JobInstance getJobExecutionStatus(@NonNull Long jobInstanceId); } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobInstanceDaoImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobInstanceDaoImpl.java index 4eff1e18..02b529bf 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobInstanceDaoImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobInstanceDaoImpl.java @@ -88,4 +88,9 @@ public class JobInstanceDaoImpl implements IJobInstanceDao { return jobInstances; } + + @Override + public JobInstance getJobExecutionStatus(@NonNull Long jobInstanceId) { + return jobInstanceMapper.getJobExecutionStatus(jobInstanceId); + } } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.java index 1ad784df..53b60a40 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.java @@ -37,4 +37,6 @@ public interface JobInstanceMapper extends BaseMapper<JobInstance> { @Param("endTime") Date endTime, @Param("jobDefineId") Long jobDefineId, @Param("jobMode") String jobMode); + + JobInstance getJobExecutionStatus(@Param("jobInstanceId") Long jobInstanceId); } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskInstanceService.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/executor/JobExecutionStatus.java similarity index 61% copy from seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskInstanceService.java copy to seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/executor/JobExecutionStatus.java index 77f2d115..833abd88 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskInstanceService.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/executor/JobExecutionStatus.java @@ -15,21 +15,18 @@ * limitations under the License. */ -package org.apache.seatunnel.app.service; +package org.apache.seatunnel.app.domain.response.executor; -import org.apache.seatunnel.app.common.Result; -import org.apache.seatunnel.app.utils.PageInfo; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; -public interface ITaskInstanceService<T> { +@Data +@AllArgsConstructor +@NoArgsConstructor +public class JobExecutionStatus { - Result<PageInfo<T>> getSyncTaskInstancePaging( - Integer userId, - String jobDefineName, - String executorName, - String stateType, - String startTime, - String endTime, - String syncTaskType, - Integer pageNo, - Integer pageSize); + private String jobStatus; + + private String errorMessage; } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskInstanceService.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskInstanceService.java index 77f2d115..ee74b244 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskInstanceService.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskInstanceService.java @@ -18,6 +18,8 @@ package org.apache.seatunnel.app.service; import org.apache.seatunnel.app.common.Result; +import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto; +import org.apache.seatunnel.app.domain.response.executor.JobExecutionStatus; import org.apache.seatunnel.app.utils.PageInfo; public interface ITaskInstanceService<T> { @@ -32,4 +34,8 @@ public interface ITaskInstanceService<T> { String syncTaskType, Integer pageNo, Integer pageSize); + + Result<JobExecutionStatus> getJobExecutionStatus(Integer userId, long jobInstanceId); + + Result<SeaTunnelJobInstanceDto> getJobExecutionDetail(Integer userId, long jobInstanceId); } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TaskInstanceServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TaskInstanceServiceImpl.java index aa89c85c..0f7a2266 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TaskInstanceServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TaskInstanceServiceImpl.java @@ -22,7 +22,9 @@ import org.apache.seatunnel.app.common.Status; import org.apache.seatunnel.app.dal.dao.IJobDefinitionDao; import org.apache.seatunnel.app.dal.dao.IJobInstanceDao; import org.apache.seatunnel.app.dal.entity.JobDefinition; +import org.apache.seatunnel.app.dal.entity.JobInstance; import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto; +import org.apache.seatunnel.app.domain.response.executor.JobExecutionStatus; import org.apache.seatunnel.app.domain.response.metrics.JobSummaryMetricsRes; import org.apache.seatunnel.app.service.BaseService; import org.apache.seatunnel.app.service.IJobDefinitionService; @@ -41,6 +43,7 @@ import lombok.extern.slf4j.Slf4j; import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -103,15 +106,20 @@ public class TaskInstanceServiceImpl implements ITaskInstanceService<SeaTunnelJo if (CollectionUtils.isEmpty(records)) { return result; } - addJobDefineNameToResult(records); - addRunningTimeToResult(records); - jobPipelineSummaryMetrics(records, syncTaskType, userId); + populateExecutionMetricsData(userId, syncTaskType, records); pageInfo.setTotal((int) jobInstanceIPage.getTotal()); pageInfo.setTotalList(records); result.setData(pageInfo); return result; } + private void populateExecutionMetricsData( + Integer userId, String syncTaskType, List<SeaTunnelJobInstanceDto> records) { + addJobDefineNameToResult(records); + addRunningTimeToResult(records); + jobPipelineSummaryMetrics(records, syncTaskType, userId); + } + private void addRunningTimeToResult(List<SeaTunnelJobInstanceDto> records) { for (SeaTunnelJobInstanceDto jobInstanceDto : records) { long runningTime = 0l; @@ -187,4 +195,46 @@ public class TaskInstanceServiceImpl implements ITaskInstanceService<SeaTunnelJo } } } + + @Override + public Result<JobExecutionStatus> getJobExecutionStatus(Integer userId, long jobInstanceId) { + JobInstance jobInstance = jobInstanceDao.getJobExecutionStatus(jobInstanceId); + if (jobInstance == null) { + return Result.failure(404, "Job instance not found"); + } + return Result.success( + new JobExecutionStatus(jobInstance.getJobStatus(), jobInstance.getErrorMessage())); + } + + @Override + public Result<SeaTunnelJobInstanceDto> getJobExecutionDetail( + Integer userId, long jobInstanceId) { + JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId); + if (jobInstance == null) { + return Result.failure(404, "Job instance not found"); + } + SeaTunnelJobInstanceDto executionDetails = convertToDto(jobInstance); + populateExecutionMetricsData( + userId, jobInstance.getJobType(), Collections.singletonList(executionDetails)); + return Result.success(executionDetails); + } + + private SeaTunnelJobInstanceDto convertToDto(JobInstance jobInstance) { + SeaTunnelJobInstanceDto dto = new SeaTunnelJobInstanceDto(); + dto.setId(jobInstance.getId()); + dto.setJobDefineId(jobInstance.getJobDefineId()); + dto.setJobStatus(jobInstance.getJobStatus()); + dto.setJobConfig(jobInstance.getJobConfig()); + dto.setEngineName(jobInstance.getEngineName()); + dto.setEngineVersion(jobInstance.getEngineVersion()); + dto.setJobEngineId(jobInstance.getJobEngineId()); + dto.setCreateUserId(jobInstance.getCreateUserId()); + dto.setUpdateUserId(jobInstance.getUpdateUserId()); + dto.setCreateTime(jobInstance.getCreateTime()); + dto.setUpdateTime(jobInstance.getUpdateTime()); + dto.setEndTime(jobInstance.getEndTime()); + dto.setJobType(jobInstance.getJobType()); + dto.setErrorMessage(jobInstance.getErrorMessage()); + return dto; + } } diff --git a/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.xml b/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.xml index ee61b70c..55c44037 100644 --- a/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.xml +++ b/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.xml @@ -53,4 +53,9 @@ </where> ORDER BY create_time DESC </select> + <select id="getJobExecutionStatus" resultType="org.apache.seatunnel.app.dal.entity.JobInstance"> + SELECT `job_status`, `error_message` + FROM t_st_job_instance t + WHERE t.id = #{jobInstanceId} + </select> </mapper> diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobExecutorControllerWrapper.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobExecutorControllerWrapper.java index c7732548..f78fae34 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobExecutorControllerWrapper.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobExecutorControllerWrapper.java @@ -18,7 +18,9 @@ package org.apache.seatunnel.app.controller; import org.apache.seatunnel.app.common.Result; import org.apache.seatunnel.app.common.SeatunnelWebTestingBase; +import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto; import org.apache.seatunnel.app.domain.request.job.JobExecParam; +import org.apache.seatunnel.app.domain.response.executor.JobExecutionStatus; import org.apache.seatunnel.app.domain.response.executor.JobExecutorRes; import org.apache.seatunnel.app.utils.JSONTestUtils; import org.apache.seatunnel.app.utils.JSONUtils; @@ -63,4 +65,18 @@ public class JobExecutorControllerWrapper extends SeatunnelWebTestingBase { sendRequest(urlWithParam("job/executor/restore?jobInstanceId=" + jobInstanceId)); return JSONTestUtils.parseObject(response, Result.class); } + + public Result<JobExecutionStatus> getJobExecutionStatus(Long jobInstanceId) { + String response = + sendRequest(urlWithParam("job/executor/status?jobInstanceId=" + jobInstanceId)); + return JSONTestUtils.parseObject( + response, new TypeReference<Result<JobExecutionStatus>>() {}); + } + + public Result<SeaTunnelJobInstanceDto> getJobExecutionDetail(Long jobInstanceId) { + String response = + sendRequest(urlWithParam("job/executor/detail?jobInstanceId=" + jobInstanceId)); + return JSONTestUtils.parseObject( + response, new TypeReference<Result<SeaTunnelJobInstanceDto>>() {}); + } } diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java index 8d20497b..50a4e419 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java @@ -21,15 +21,16 @@ import org.apache.seatunnel.app.common.SeaTunnelWebCluster; import org.apache.seatunnel.app.controller.JobControllerWrapper; import org.apache.seatunnel.app.controller.JobExecutorControllerWrapper; import org.apache.seatunnel.app.controller.SeatunnelDatasourceControllerWrapper; -import org.apache.seatunnel.app.controller.TaskInstanceControllerWrapper; import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto; import org.apache.seatunnel.app.domain.request.datasource.DatasourceReq; import org.apache.seatunnel.app.domain.request.job.JobCreateReq; import org.apache.seatunnel.app.domain.request.job.JobExecParam; import org.apache.seatunnel.app.domain.request.job.PluginConfig; +import org.apache.seatunnel.app.domain.response.executor.JobExecutionStatus; import org.apache.seatunnel.app.domain.response.executor.JobExecutorRes; import org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes; import org.apache.seatunnel.app.utils.JobUtils; +import org.apache.seatunnel.engine.core.job.JobStatus; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -53,7 +54,6 @@ public class JobExecutorControllerTest { private static final String uniqueId = "_" + System.currentTimeMillis(); private static SeatunnelDatasourceControllerWrapper seatunnelDatasourceControllerWrapper; private static JobControllerWrapper jobControllerWrapper; - private static TaskInstanceControllerWrapper taskInstanceControllerWrapper; @BeforeAll public static void setUp() { @@ -61,7 +61,6 @@ public class JobExecutorControllerTest { jobExecutorControllerWrapper = new JobExecutorControllerWrapper(); seatunnelDatasourceControllerWrapper = new SeatunnelDatasourceControllerWrapper(); jobControllerWrapper = new JobControllerWrapper(); - taskInstanceControllerWrapper = new TaskInstanceControllerWrapper(); } @Test @@ -277,10 +276,19 @@ public class JobExecutorControllerTest { // Fails because of the wrong database credentials. assertFalse(result.isSuccess()); // Even though job failed but job instance is created into the database. - assertTrue(result.getData() > 0); - SeaTunnelJobInstanceDto taskInstanceList = - taskInstanceControllerWrapper.getTaskInstanceList(jobName); - assertNotNull(taskInstanceList.getErrorMessage()); + Long jobInstanceId = result.getData(); + assertTrue(jobInstanceId > 0); + Result<JobExecutionStatus> jobExecutionStatusResult = + jobExecutorControllerWrapper.getJobExecutionStatus(jobInstanceId); + assertTrue(jobExecutionStatusResult.isSuccess()); + assertEquals(JobStatus.FAILED.name(), jobExecutionStatusResult.getData().getJobStatus()); + assertNotNull(jobExecutionStatusResult.getData().getErrorMessage()); + + // Invalid jobInstanceId + Result<JobExecutionStatus> jobExecutionStatusResult2 = + jobExecutorControllerWrapper.getJobExecutionStatus(123L); + assertFalse(jobExecutionStatusResult2.isSuccess()); + assertEquals(404, jobExecutionStatusResult2.getCode()); } @Test @@ -291,12 +299,22 @@ public class JobExecutorControllerTest { // job submitted successfully but it will fail during execution assertTrue(result.isSuccess()); assertTrue(result.getData() > 0); - JobUtils.waitForJobCompletion(result.getData()); + Long jobInstanceId = result.getData(); + JobUtils.waitForJobCompletion(jobInstanceId); // extra second to let the data get updated in the database Thread.sleep(2000); - SeaTunnelJobInstanceDto taskInstanceList = - taskInstanceControllerWrapper.getTaskInstanceList(jobName); - assertNotNull(taskInstanceList.getErrorMessage()); + Result<SeaTunnelJobInstanceDto> jobExecutionDetailResult = + jobExecutorControllerWrapper.getJobExecutionDetail(jobInstanceId); + assertTrue(jobExecutionDetailResult.isSuccess()); + assertEquals(JobStatus.FAILED.name(), jobExecutionDetailResult.getData().getJobStatus()); + assertNotNull(jobExecutionDetailResult.getData().getErrorMessage()); + assertNotNull(jobExecutionDetailResult.getData().getJobDefineName()); + + // Invalid jobInstanceId + Result<SeaTunnelJobInstanceDto> jobExecutionDetailResult2 = + jobExecutorControllerWrapper.getJobExecutionDetail(123L); + assertFalse(jobExecutionDetailResult2.isSuccess()); + assertEquals(404, jobExecutionDetailResult2.getCode()); } @AfterAll