This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git


The following commit(s) were added to refs/heads/main by this push:
     new c0880eee [Improvement] [Seatunnel-web] Add API to get job execution 
status. (#197)
c0880eee is described below

commit c0880eeefd1b3f2242c3d42b799ddd5f15c5ce47
Author: Mohammad Arshad <ars...@apache.org>
AuthorDate: Thu Aug 29 07:33:46 2024 +0530

    [Improvement] [Seatunnel-web] Add API to get job execution status. (#197)
---
 .../app/controller/JobExecutorController.java      | 20 ++++++++
 .../seatunnel/app/dal/dao/IJobInstanceDao.java     |  2 +
 .../app/dal/dao/impl/JobInstanceDaoImpl.java       |  5 ++
 .../app/dal/mapper/JobInstanceMapper.java          |  2 +
 .../response/executor/JobExecutionStatus.java}     | 25 +++++-----
 .../app/service/ITaskInstanceService.java          |  6 +++
 .../app/service/impl/TaskInstanceServiceImpl.java  | 56 ++++++++++++++++++++--
 .../seatunnel/app/dal/mapper/JobInstanceMapper.xml |  5 ++
 .../controller/JobExecutorControllerWrapper.java   | 16 +++++++
 .../app/test/JobExecutorControllerTest.java        | 40 +++++++++++-----
 10 files changed, 149 insertions(+), 28 deletions(-)

diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobExecutorController.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobExecutorController.java
index d330f375..44fe7d95 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobExecutorController.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobExecutorController.java
@@ -18,10 +18,13 @@
 package org.apache.seatunnel.app.controller;
 
 import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
 import org.apache.seatunnel.app.domain.request.job.JobExecParam;
+import org.apache.seatunnel.app.domain.response.executor.JobExecutionStatus;
 import org.apache.seatunnel.app.domain.response.executor.JobExecutorRes;
 import org.apache.seatunnel.app.service.IJobExecutorService;
 import org.apache.seatunnel.app.service.IJobInstanceService;
+import org.apache.seatunnel.app.service.ITaskInstanceService;
 import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
 import org.apache.seatunnel.server.common.SeatunnelException;
 
@@ -48,6 +51,7 @@ public class JobExecutorController {
 
     @Resource IJobExecutorService jobExecutorService;
     @Resource private IJobInstanceService jobInstanceService;
+    @Resource private ITaskInstanceService<SeaTunnelJobInstanceDto> 
taskInstanceService;
 
     @PostMapping("/execute")
     @ApiOperation(value = "Execute synchronization tasks", httpMethod = "POST")
@@ -88,4 +92,20 @@ public class JobExecutorController {
             @ApiParam(value = "jobInstanceId", required = true) @RequestParam 
Long jobInstanceId) {
         return jobExecutorService.jobStore(userId, jobInstanceId);
     }
+
+    @GetMapping("/status")
+    @ApiOperation(value = "get job execution status", httpMethod = "GET")
+    Result<JobExecutionStatus> getJobExecutionStatus(
+            @ApiParam(value = "userId", required = true) 
@RequestAttribute("userId") Integer userId,
+            @ApiParam(value = "jobInstanceId", required = true) @RequestParam 
Long jobInstanceId) {
+        return taskInstanceService.getJobExecutionStatus(userId, 
jobInstanceId);
+    }
+
+    @GetMapping("/detail")
+    @ApiOperation(value = "get job execution status and some more details", 
httpMethod = "GET")
+    Result<SeaTunnelJobInstanceDto> getJobExecutionDetail(
+            @ApiParam(value = "userId", required = true) 
@RequestAttribute("userId") Integer userId,
+            @ApiParam(value = "jobInstanceId", required = true) @RequestParam 
Long jobInstanceId) {
+        return taskInstanceService.getJobExecutionDetail(userId, 
jobInstanceId);
+    }
 }
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobInstanceDao.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobInstanceDao.java
index f4c5b8f1..44026af8 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobInstanceDao.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobInstanceDao.java
@@ -47,4 +47,6 @@ public interface IJobInstanceDao {
             String jobMode);
 
     List<JobInstance> getAllJobInstance(@NonNull List<Long> jobInstanceIdList);
+
+    JobInstance getJobExecutionStatus(@NonNull Long jobInstanceId);
 }
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobInstanceDaoImpl.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobInstanceDaoImpl.java
index 4eff1e18..02b529bf 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobInstanceDaoImpl.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobInstanceDaoImpl.java
@@ -88,4 +88,9 @@ public class JobInstanceDaoImpl implements IJobInstanceDao {
 
         return jobInstances;
     }
+
+    @Override
+    public JobInstance getJobExecutionStatus(@NonNull Long jobInstanceId) {
+        return jobInstanceMapper.getJobExecutionStatus(jobInstanceId);
+    }
 }
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.java
index 1ad784df..53b60a40 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.java
@@ -37,4 +37,6 @@ public interface JobInstanceMapper extends 
BaseMapper<JobInstance> {
             @Param("endTime") Date endTime,
             @Param("jobDefineId") Long jobDefineId,
             @Param("jobMode") String jobMode);
+
+    JobInstance getJobExecutionStatus(@Param("jobInstanceId") Long 
jobInstanceId);
 }
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskInstanceService.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/executor/JobExecutionStatus.java
similarity index 61%
copy from 
seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskInstanceService.java
copy to 
seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/executor/JobExecutionStatus.java
index 77f2d115..833abd88 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskInstanceService.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/executor/JobExecutionStatus.java
@@ -15,21 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.app.service;
+package org.apache.seatunnel.app.domain.response.executor;
 
-import org.apache.seatunnel.app.common.Result;
-import org.apache.seatunnel.app.utils.PageInfo;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
 
-public interface ITaskInstanceService<T> {
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class JobExecutionStatus {
 
-    Result<PageInfo<T>> getSyncTaskInstancePaging(
-            Integer userId,
-            String jobDefineName,
-            String executorName,
-            String stateType,
-            String startTime,
-            String endTime,
-            String syncTaskType,
-            Integer pageNo,
-            Integer pageSize);
+    private String jobStatus;
+
+    private String errorMessage;
 }
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskInstanceService.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskInstanceService.java
index 77f2d115..ee74b244 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskInstanceService.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITaskInstanceService.java
@@ -18,6 +18,8 @@
 package org.apache.seatunnel.app.service;
 
 import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
+import org.apache.seatunnel.app.domain.response.executor.JobExecutionStatus;
 import org.apache.seatunnel.app.utils.PageInfo;
 
 public interface ITaskInstanceService<T> {
@@ -32,4 +34,8 @@ public interface ITaskInstanceService<T> {
             String syncTaskType,
             Integer pageNo,
             Integer pageSize);
+
+    Result<JobExecutionStatus> getJobExecutionStatus(Integer userId, long 
jobInstanceId);
+
+    Result<SeaTunnelJobInstanceDto> getJobExecutionDetail(Integer userId, long 
jobInstanceId);
 }
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TaskInstanceServiceImpl.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TaskInstanceServiceImpl.java
index aa89c85c..0f7a2266 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TaskInstanceServiceImpl.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TaskInstanceServiceImpl.java
@@ -22,7 +22,9 @@ import org.apache.seatunnel.app.common.Status;
 import org.apache.seatunnel.app.dal.dao.IJobDefinitionDao;
 import org.apache.seatunnel.app.dal.dao.IJobInstanceDao;
 import org.apache.seatunnel.app.dal.entity.JobDefinition;
+import org.apache.seatunnel.app.dal.entity.JobInstance;
 import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
+import org.apache.seatunnel.app.domain.response.executor.JobExecutionStatus;
 import org.apache.seatunnel.app.domain.response.metrics.JobSummaryMetricsRes;
 import org.apache.seatunnel.app.service.BaseService;
 import org.apache.seatunnel.app.service.IJobDefinitionService;
@@ -41,6 +43,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -103,15 +106,20 @@ public class TaskInstanceServiceImpl implements 
ITaskInstanceService<SeaTunnelJo
         if (CollectionUtils.isEmpty(records)) {
             return result;
         }
-        addJobDefineNameToResult(records);
-        addRunningTimeToResult(records);
-        jobPipelineSummaryMetrics(records, syncTaskType, userId);
+        populateExecutionMetricsData(userId, syncTaskType, records);
         pageInfo.setTotal((int) jobInstanceIPage.getTotal());
         pageInfo.setTotalList(records);
         result.setData(pageInfo);
         return result;
     }
 
+    private void populateExecutionMetricsData(
+            Integer userId, String syncTaskType, List<SeaTunnelJobInstanceDto> 
records) {
+        addJobDefineNameToResult(records);
+        addRunningTimeToResult(records);
+        jobPipelineSummaryMetrics(records, syncTaskType, userId);
+    }
+
     private void addRunningTimeToResult(List<SeaTunnelJobInstanceDto> records) 
{
         for (SeaTunnelJobInstanceDto jobInstanceDto : records) {
             long runningTime = 0l;
@@ -187,4 +195,46 @@ public class TaskInstanceServiceImpl implements 
ITaskInstanceService<SeaTunnelJo
             }
         }
     }
+
+    @Override
+    public Result<JobExecutionStatus> getJobExecutionStatus(Integer userId, 
long jobInstanceId) {
+        JobInstance jobInstance = 
jobInstanceDao.getJobExecutionStatus(jobInstanceId);
+        if (jobInstance == null) {
+            return Result.failure(404, "Job instance not found");
+        }
+        return Result.success(
+                new JobExecutionStatus(jobInstance.getJobStatus(), 
jobInstance.getErrorMessage()));
+    }
+
+    @Override
+    public Result<SeaTunnelJobInstanceDto> getJobExecutionDetail(
+            Integer userId, long jobInstanceId) {
+        JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
+        if (jobInstance == null) {
+            return Result.failure(404, "Job instance not found");
+        }
+        SeaTunnelJobInstanceDto executionDetails = convertToDto(jobInstance);
+        populateExecutionMetricsData(
+                userId, jobInstance.getJobType(), 
Collections.singletonList(executionDetails));
+        return Result.success(executionDetails);
+    }
+
+    private SeaTunnelJobInstanceDto convertToDto(JobInstance jobInstance) {
+        SeaTunnelJobInstanceDto dto = new SeaTunnelJobInstanceDto();
+        dto.setId(jobInstance.getId());
+        dto.setJobDefineId(jobInstance.getJobDefineId());
+        dto.setJobStatus(jobInstance.getJobStatus());
+        dto.setJobConfig(jobInstance.getJobConfig());
+        dto.setEngineName(jobInstance.getEngineName());
+        dto.setEngineVersion(jobInstance.getEngineVersion());
+        dto.setJobEngineId(jobInstance.getJobEngineId());
+        dto.setCreateUserId(jobInstance.getCreateUserId());
+        dto.setUpdateUserId(jobInstance.getUpdateUserId());
+        dto.setCreateTime(jobInstance.getCreateTime());
+        dto.setUpdateTime(jobInstance.getUpdateTime());
+        dto.setEndTime(jobInstance.getEndTime());
+        dto.setJobType(jobInstance.getJobType());
+        dto.setErrorMessage(jobInstance.getErrorMessage());
+        return dto;
+    }
 }
diff --git 
a/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.xml
 
b/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.xml
index ee61b70c..55c44037 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.xml
+++ 
b/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.xml
@@ -53,4 +53,9 @@
         </where>
         ORDER BY create_time DESC
     </select>
+    <select id="getJobExecutionStatus" 
resultType="org.apache.seatunnel.app.dal.entity.JobInstance">
+        SELECT `job_status`, `error_message`
+        FROM t_st_job_instance t
+        WHERE t.id = #{jobInstanceId}
+    </select>
 </mapper>
diff --git 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobExecutorControllerWrapper.java
 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobExecutorControllerWrapper.java
index c7732548..f78fae34 100644
--- 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobExecutorControllerWrapper.java
+++ 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/JobExecutorControllerWrapper.java
@@ -18,7 +18,9 @@ package org.apache.seatunnel.app.controller;
 
 import org.apache.seatunnel.app.common.Result;
 import org.apache.seatunnel.app.common.SeatunnelWebTestingBase;
+import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
 import org.apache.seatunnel.app.domain.request.job.JobExecParam;
+import org.apache.seatunnel.app.domain.response.executor.JobExecutionStatus;
 import org.apache.seatunnel.app.domain.response.executor.JobExecutorRes;
 import org.apache.seatunnel.app.utils.JSONTestUtils;
 import org.apache.seatunnel.app.utils.JSONUtils;
@@ -63,4 +65,18 @@ public class JobExecutorControllerWrapper extends 
SeatunnelWebTestingBase {
                 sendRequest(urlWithParam("job/executor/restore?jobInstanceId=" 
+ jobInstanceId));
         return JSONTestUtils.parseObject(response, Result.class);
     }
+
+    public Result<JobExecutionStatus> getJobExecutionStatus(Long 
jobInstanceId) {
+        String response =
+                sendRequest(urlWithParam("job/executor/status?jobInstanceId=" 
+ jobInstanceId));
+        return JSONTestUtils.parseObject(
+                response, new TypeReference<Result<JobExecutionStatus>>() {});
+    }
+
+    public Result<SeaTunnelJobInstanceDto> getJobExecutionDetail(Long 
jobInstanceId) {
+        String response =
+                sendRequest(urlWithParam("job/executor/detail?jobInstanceId=" 
+ jobInstanceId));
+        return JSONTestUtils.parseObject(
+                response, new TypeReference<Result<SeaTunnelJobInstanceDto>>() 
{});
+    }
 }
diff --git 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
index 8d20497b..50a4e419 100644
--- 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
+++ 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
@@ -21,15 +21,16 @@ import org.apache.seatunnel.app.common.SeaTunnelWebCluster;
 import org.apache.seatunnel.app.controller.JobControllerWrapper;
 import org.apache.seatunnel.app.controller.JobExecutorControllerWrapper;
 import 
org.apache.seatunnel.app.controller.SeatunnelDatasourceControllerWrapper;
-import org.apache.seatunnel.app.controller.TaskInstanceControllerWrapper;
 import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
 import org.apache.seatunnel.app.domain.request.datasource.DatasourceReq;
 import org.apache.seatunnel.app.domain.request.job.JobCreateReq;
 import org.apache.seatunnel.app.domain.request.job.JobExecParam;
 import org.apache.seatunnel.app.domain.request.job.PluginConfig;
+import org.apache.seatunnel.app.domain.response.executor.JobExecutionStatus;
 import org.apache.seatunnel.app.domain.response.executor.JobExecutorRes;
 import 
org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
 import org.apache.seatunnel.app.utils.JobUtils;
+import org.apache.seatunnel.engine.core.job.JobStatus;
 
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
@@ -53,7 +54,6 @@ public class JobExecutorControllerTest {
     private static final String uniqueId = "_" + System.currentTimeMillis();
     private static SeatunnelDatasourceControllerWrapper 
seatunnelDatasourceControllerWrapper;
     private static JobControllerWrapper jobControllerWrapper;
-    private static TaskInstanceControllerWrapper taskInstanceControllerWrapper;
 
     @BeforeAll
     public static void setUp() {
@@ -61,7 +61,6 @@ public class JobExecutorControllerTest {
         jobExecutorControllerWrapper = new JobExecutorControllerWrapper();
         seatunnelDatasourceControllerWrapper = new 
SeatunnelDatasourceControllerWrapper();
         jobControllerWrapper = new JobControllerWrapper();
-        taskInstanceControllerWrapper = new TaskInstanceControllerWrapper();
     }
 
     @Test
@@ -277,10 +276,19 @@ public class JobExecutorControllerTest {
         // Fails because of the wrong database credentials.
         assertFalse(result.isSuccess());
         // Even though job failed but job instance is created into the 
database.
-        assertTrue(result.getData() > 0);
-        SeaTunnelJobInstanceDto taskInstanceList =
-                taskInstanceControllerWrapper.getTaskInstanceList(jobName);
-        assertNotNull(taskInstanceList.getErrorMessage());
+        Long jobInstanceId = result.getData();
+        assertTrue(jobInstanceId > 0);
+        Result<JobExecutionStatus> jobExecutionStatusResult =
+                
jobExecutorControllerWrapper.getJobExecutionStatus(jobInstanceId);
+        assertTrue(jobExecutionStatusResult.isSuccess());
+        assertEquals(JobStatus.FAILED.name(), 
jobExecutionStatusResult.getData().getJobStatus());
+        assertNotNull(jobExecutionStatusResult.getData().getErrorMessage());
+
+        // Invalid jobInstanceId
+        Result<JobExecutionStatus> jobExecutionStatusResult2 =
+                jobExecutorControllerWrapper.getJobExecutionStatus(123L);
+        assertFalse(jobExecutionStatusResult2.isSuccess());
+        assertEquals(404, jobExecutionStatusResult2.getCode());
     }
 
     @Test
@@ -291,12 +299,22 @@ public class JobExecutorControllerTest {
         // job submitted successfully but it will fail during execution
         assertTrue(result.isSuccess());
         assertTrue(result.getData() > 0);
-        JobUtils.waitForJobCompletion(result.getData());
+        Long jobInstanceId = result.getData();
+        JobUtils.waitForJobCompletion(jobInstanceId);
         // extra second to let the data get updated in the database
         Thread.sleep(2000);
-        SeaTunnelJobInstanceDto taskInstanceList =
-                taskInstanceControllerWrapper.getTaskInstanceList(jobName);
-        assertNotNull(taskInstanceList.getErrorMessage());
+        Result<SeaTunnelJobInstanceDto> jobExecutionDetailResult =
+                
jobExecutorControllerWrapper.getJobExecutionDetail(jobInstanceId);
+        assertTrue(jobExecutionDetailResult.isSuccess());
+        assertEquals(JobStatus.FAILED.name(), 
jobExecutionDetailResult.getData().getJobStatus());
+        assertNotNull(jobExecutionDetailResult.getData().getErrorMessage());
+        assertNotNull(jobExecutionDetailResult.getData().getJobDefineName());
+
+        // Invalid jobInstanceId
+        Result<SeaTunnelJobInstanceDto> jobExecutionDetailResult2 =
+                jobExecutorControllerWrapper.getJobExecutionDetail(123L);
+        assertFalse(jobExecutionDetailResult2.isSuccess());
+        assertEquals(404, jobExecutionDetailResult2.getCode());
     }
 
     @AfterAll

Reply via email to