This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git


The following commit(s) were added to refs/heads/main by this push:
     new 94d8a394 [Improve][SeaTunnel-Web] Change JobStatus to enum type to 
avoid hard coding (#205)
94d8a394 is described below

commit 94d8a394cb88e56c0cfdae1033154e96ab1c7789
Author: ChunFuWu <319355...@qq.com>
AuthorDate: Sat Sep 7 16:32:20 2024 +0800

    [Improve][SeaTunnel-Web] Change JobStatus to enum type to avoid hard coding 
(#205)
---
 .../seatunnel/app/dal/entity/JobInstance.java      |  4 ++-
 .../seatunnel/app/dal/entity/JobMetrics.java       |  4 ++-
 .../response/executor/JobExecutionStatus.java      |  4 ++-
 .../metrics/JobPipelineDetailMetricsRes.java       |  4 ++-
 .../metrics/JobPipelineSummaryMetricsRes.java      |  4 ++-
 .../response/metrics/JobSummaryMetricsRes.java     |  4 ++-
 .../app/domain/response/task/JobSimpleInfoRes.java |  4 ++-
 .../app/service/impl/JobExecutorServiceImpl.java   |  9 +++--
 .../app/service/impl/JobInstanceServiceImpl.java   |  2 +-
 .../app/service/impl/JobMetricsServiceImpl.java    | 42 +++++++++++-----------
 .../engine/SeaTunnelEngineMetricsExtractor.java    | 34 +++++++++---------
 .../thirdparty/engine/SeaTunnelEngineProxy.java    |  5 +--
 .../metrics/IEngineMetricsExtractor.java           |  3 +-
 .../org/apache/seatunnel/app/utils/JobUtils.java   |  9 ++---
 .../seatunnel/app/test/JobControllerTest.java      |  3 +-
 .../app/test/JobExecutorControllerTest.java        | 10 +++---
 .../app/test/TaskInstanceControllerTest.java       |  3 +-
 .../seatunnel/app/utils/JobTestingUtils.java       |  6 ++--
 18 files changed, 88 insertions(+), 66 deletions(-)

diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobInstance.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobInstance.java
index db5cbeba..221867d8 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobInstance.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobInstance.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.app.dal.entity;
 
+import org.apache.seatunnel.engine.core.job.JobStatus;
+
 import com.baomidou.mybatisplus.annotation.IdType;
 import com.baomidou.mybatisplus.annotation.TableField;
 import com.baomidou.mybatisplus.annotation.TableId;
@@ -42,7 +44,7 @@ public class JobInstance {
     private Long jobDefineId;
 
     @TableField("job_status")
-    private String jobStatus;
+    private JobStatus jobStatus;
 
     @TableField("job_config")
     private String jobConfig;
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobMetrics.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobMetrics.java
index 633ad645..6706c592 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobMetrics.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobMetrics.java
@@ -16,6 +16,8 @@
  */
 package org.apache.seatunnel.app.dal.entity;
 
+import org.apache.seatunnel.engine.core.job.JobStatus;
+
 import com.baomidou.mybatisplus.annotation.IdType;
 import com.baomidou.mybatisplus.annotation.TableField;
 import com.baomidou.mybatisplus.annotation.TableId;
@@ -65,7 +67,7 @@ public class JobMetrics {
     private long recordDelay;
 
     @TableField("status")
-    private String status;
+    private JobStatus status;
 
     @TableField("create_user_id")
     private Integer createUserId;
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/executor/JobExecutionStatus.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/executor/JobExecutionStatus.java
index 833abd88..8ab8c4c8 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/executor/JobExecutionStatus.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/executor/JobExecutionStatus.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.app.domain.response.executor;
 
+import org.apache.seatunnel.engine.core.job.JobStatus;
+
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
@@ -26,7 +28,7 @@ import lombok.NoArgsConstructor;
 @NoArgsConstructor
 public class JobExecutionStatus {
 
-    private String jobStatus;
+    private JobStatus jobStatus;
 
     private String errorMessage;
 }
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobPipelineDetailMetricsRes.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobPipelineDetailMetricsRes.java
index b864817c..a18c7c81 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobPipelineDetailMetricsRes.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobPipelineDetailMetricsRes.java
@@ -16,6 +16,8 @@
  */
 package org.apache.seatunnel.app.domain.response.metrics;
 
+import org.apache.seatunnel.engine.core.job.JobStatus;
+
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
@@ -45,5 +47,5 @@ public class JobPipelineDetailMetricsRes {
 
     private long recordDelay;
 
-    private String status;
+    private JobStatus status;
 }
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobPipelineSummaryMetricsRes.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobPipelineSummaryMetricsRes.java
index cac0499e..4c9a3211 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobPipelineSummaryMetricsRes.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobPipelineSummaryMetricsRes.java
@@ -16,6 +16,8 @@
  */
 package org.apache.seatunnel.app.domain.response.metrics;
 
+import org.apache.seatunnel.engine.core.job.JobStatus;
+
 import lombok.AllArgsConstructor;
 import lombok.Data;
 
@@ -28,5 +30,5 @@ public class JobPipelineSummaryMetricsRes {
 
     private long writeRowCount;
 
-    private String status;
+    private JobStatus status;
 }
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobSummaryMetricsRes.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobSummaryMetricsRes.java
index d4b36fc5..0bb943b5 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobSummaryMetricsRes.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobSummaryMetricsRes.java
@@ -16,6 +16,8 @@
  */
 package org.apache.seatunnel.app.domain.response.metrics;
 
+import org.apache.seatunnel.engine.core.job.JobStatus;
+
 import lombok.AllArgsConstructor;
 import lombok.Data;
 
@@ -30,5 +32,5 @@ public class JobSummaryMetricsRes {
 
     private long writeRowCount;
 
-    private String status;
+    private JobStatus status;
 }
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/task/JobSimpleInfoRes.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/task/JobSimpleInfoRes.java
index 22fda68c..d66c4640 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/task/JobSimpleInfoRes.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/task/JobSimpleInfoRes.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.app.domain.response.task;
 
+import org.apache.seatunnel.engine.core.job.JobStatus;
+
 import com.fasterxml.jackson.annotation.JsonProperty;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
@@ -40,7 +42,7 @@ public class JobSimpleInfoRes {
     private String jobName;
 
     @ApiModelProperty(value = "job status", dataType = "String")
-    private String jobStatus;
+    private JobStatus jobStatus;
 
     @ApiModelProperty(value = "job plan", dataType = "String")
     private String jobPlan;
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
index 15250caa..ec1b37de 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
@@ -55,7 +55,6 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.util.Date;
-import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -128,7 +127,7 @@ public class JobExecutorServiceImpl implements 
IJobExecutorService {
         } catch (Throwable e) {
             log.error("Job execution submission failed.", e);
             JobInstance jobInstance = 
jobInstanceDao.getJobInstance(jobInstanceId);
-            jobInstance.setJobStatus(JobStatus.FAILED.name());
+            jobInstance.setJobStatus(JobStatus.FAILED);
             jobInstance.setEndTime(new Date());
             String jobInstanceErrorMessage = 
JobUtils.getJobInstanceErrorMessage(e.getMessage());
             jobInstance.setErrorMessage(jobInstanceErrorMessage);
@@ -183,14 +182,14 @@ public class JobExecutorServiceImpl implements 
IJobExecutorService {
     @Override
     public Result<Void> jobPause(Integer userId, Long jobInstanceId) {
         JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
-        if (Objects.equals(
-                getJobStatusFromEngine(jobInstance, 
jobInstance.getJobEngineId()), "RUNNING")) {
+        if (getJobStatusFromEngine(jobInstance, jobInstance.getJobEngineId())
+                == JobStatus.RUNNING) {
             pauseJobInEngine(jobInstance.getJobEngineId());
         }
         return Result.success();
     }
 
-    private String getJobStatusFromEngine(@NonNull JobInstance jobInstance, 
String jobEngineId) {
+    private JobStatus getJobStatusFromEngine(@NonNull JobInstance jobInstance, 
String jobEngineId) {
 
         Engine engine = new Engine(jobInstance.getEngineName(), 
jobInstance.getEngineVersion());
 
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
index 8dff7bff..d58d19b4 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
@@ -366,7 +366,7 @@ public class JobInstanceServiceImpl extends 
SeatunnelBaseServiceImpl
         
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_EXECUTOR_COMPLETE, 
userId);
         JobInstance jobInstance = 
jobInstanceDao.getJobInstanceMapper().selectById(jobInstanceId);
         jobMetricsService.syncJobDataToDb(jobInstance, userId, jobEngineId);
-        jobInstance.setJobStatus(jobResult.getStatus().name());
+        jobInstance.setJobStatus(jobResult.getStatus());
         jobInstance.setJobEngineId(jobEngineId);
         jobInstance.setUpdateUserId(userId);
         
jobInstance.setErrorMessage(JobUtils.getJobInstanceErrorMessage(jobResult.getError()));
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java
index d65679b0..f424aee0 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java
@@ -33,6 +33,7 @@ import 
org.apache.seatunnel.app.thirdparty.engine.SeaTunnelEngineProxy;
 import 
org.apache.seatunnel.app.thirdparty.metrics.EngineMetricsExtractorFactory;
 import org.apache.seatunnel.app.thirdparty.metrics.IEngineMetricsExtractor;
 import org.apache.seatunnel.app.utils.JobUtils;
+import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.server.common.CodeGenerateUtils;
 import org.apache.seatunnel.server.common.Constants;
 import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
@@ -89,7 +90,7 @@ public class JobMetricsServiceImpl extends 
SeatunnelBaseServiceImpl implements I
         Engine engine = new Engine(jobInstance.getEngineName(), 
jobInstance.getEngineVersion());
         IEngineMetricsExtractor engineMetricsExtractor =
                 (new 
EngineMetricsExtractorFactory(engine)).getEngineMetricsExtractor();
-        String jobStatus = engineMetricsExtractor.getJobStatus(jobEngineId);
+        JobStatus jobStatus = engineMetricsExtractor.getJobStatus(jobEngineId);
 
         List<JobMetrics> jobPipelineDetailMetrics =
                 getJobPipelineDetailMetrics(jobInstance, userId);
@@ -160,8 +161,8 @@ public class JobMetricsServiceImpl extends 
SeatunnelBaseServiceImpl implements I
             log.info("jobEngineId={}", jobInstance.getJobEngineId());
 
             if (jobInstance.getJobStatus() == null
-                    || jobInstance.getJobStatus().equals("FAILED")
-                    || jobInstance.getJobStatus().equals("RUNNING")) {
+                    || jobInstance.getJobStatus() == JobStatus.FAILED
+                    || jobInstance.getJobStatus() == JobStatus.RUNNING) {
                 // Obtain monitoring information from the collection of 
running jobs returned from
                 // the engine
                 if (!allRunningJobMetricsFromEngine.isEmpty()
@@ -192,15 +193,15 @@ public class JobMetricsServiceImpl extends 
SeatunnelBaseServiceImpl implements I
                     if (jobMetricsFromDb != null) {
                         jobSummaryMetricsResMap.put(jobInstance.getId(), 
jobMetricsFromDb);
                     }
-                    if ("RUNNING".equals(jobInstance.getJobStatus())) {
+                    if (jobInstance.getJobStatus() == JobStatus.RUNNING) {
                         // Set the job status of jobInstance and jobMetrics in 
the database to
                         // finished
-                        jobInstance.setJobStatus("FINISHED");
+                        jobInstance.setJobStatus(JobStatus.FINISHED);
                         
jobInstanceDao.getJobInstanceMapper().updateById(jobInstance);
                     }
                 }
-            } else if (jobInstance.getJobStatus().equals("FINISHED")
-                    || jobInstance.getJobStatus().equals("CANCELED")) {
+            } else if (jobInstance.getJobStatus() == JobStatus.FINISHED
+                    || jobInstance.getJobStatus() == JobStatus.CANCELED) {
                 // If the status of the job is finished or cancelled, the 
monitoring information is
                 // directly obtained from MySQL
                 JobSummaryMetricsRes jobMetricsFromDb =
@@ -222,7 +223,7 @@ public class JobMetricsServiceImpl extends 
SeatunnelBaseServiceImpl implements I
             Map<Long, HashMap<Integer, JobMetrics>> 
allRunningJobMetricsFromEngine,
             Map<Long, Long> jobInstanceIdAndJobEngineIdMap,
             Integer userId) {
-        jobInstance.setJobStatus("RUNNING");
+        jobInstance.setJobStatus(JobStatus.RUNNING);
         HashMap<Integer, JobMetrics> jobMetricsFromEngine =
                 allRunningJobMetricsFromEngine.get(
                         
jobInstanceIdAndJobEngineIdMap.get(jobInstance.getId()));
@@ -246,7 +247,7 @@ public class JobMetricsServiceImpl extends 
SeatunnelBaseServiceImpl implements I
                                     jobMetricsFromEngine
                                             .get(jobMetrics.getPipelineId())
                                             .getWriteRowCount()));
-            jobMetricsFromDb.forEach(jobMetrics -> 
jobMetrics.setStatus("RUNNING"));
+            jobMetricsFromDb.forEach(jobMetrics -> 
jobMetrics.setStatus(JobStatus.RUNNING));
 
             updateJobInstanceAndMetrics(jobInstance, jobMetricsFromDb);
         }
@@ -265,7 +266,7 @@ public class JobMetricsServiceImpl extends 
SeatunnelBaseServiceImpl implements I
 
             try {
                 if (jobInstance.getJobStatus() != null
-                        && jobInstance.getJobStatus().equals("CANCELED")) {
+                        && jobInstance.getJobStatus() == JobStatus.CANCELED) {
                     // If the status of the job is finished or cancelled
                     // the monitoring information is directly obtained from 
MySQL
                     JobSummaryMetricsRes jobMetricsFromDb =
@@ -278,8 +279,8 @@ public class JobMetricsServiceImpl extends 
SeatunnelBaseServiceImpl implements I
                     jobSummaryMetricsResMap.put(jobInstance.getId(), 
jobMetricsFromDb);
 
                 } else if (jobInstance.getJobStatus() != null
-                        && (jobInstance.getJobStatus().equals("FINISHED")
-                                || 
jobInstance.getJobStatus().equals("FAILED"))) {
+                        && (jobInstance.getJobStatus() == JobStatus.FINISHED
+                                || jobInstance.getJobStatus() == 
JobStatus.FAILED)) {
                     // Obtain monitoring information from the collection of 
running jobs returned
                     // from
                     // the engine
@@ -332,9 +333,9 @@ public class JobMetricsServiceImpl extends 
SeatunnelBaseServiceImpl implements I
                                         jobInstance);
                         jobSummaryMetricsResMap.put(jobInstance.getId(), 
jobMetricsFromEngineRes);
                     } else {
-                        String jobStatusByJobEngineId = null;
+                        JobStatus jobStatus = null;
                         try {
-                            jobStatusByJobEngineId =
+                            jobStatus =
                                     getJobStatusByJobEngineId(
                                             String.valueOf(
                                                     
jobInstanceIdAndJobEngineIdMap.get(
@@ -345,8 +346,8 @@ public class JobMetricsServiceImpl extends 
SeatunnelBaseServiceImpl implements I
                                     jobInstance.getId());
                         }
 
-                        if (jobStatusByJobEngineId != null) {
-                            jobInstance.setJobStatus(jobStatusByJobEngineId);
+                        if (jobStatus != null) {
+                            jobInstance.setJobStatus(jobStatus);
                             jobInstanceDao.update(jobInstance);
                             JobSummaryMetricsRes jobSummaryMetricsResByDb =
                                     getJobSummaryMetricsResByDb(
@@ -365,7 +366,7 @@ public class JobMetricsServiceImpl extends 
SeatunnelBaseServiceImpl implements I
                                                     
jobInstanceIdAndJobEngineIdMap.get(
                                                             
jobInstance.getId())));
                             if (!jobMetricsFromDb.isEmpty()) {
-                                String finalJobStatusByJobEngineId = 
jobStatusByJobEngineId;
+                                JobStatus finalJobStatusByJobEngineId = 
jobStatus;
                                 jobMetricsFromDb.forEach(
                                         jobMetrics ->
                                                 
jobMetrics.setStatus(finalJobStatusByJobEngineId));
@@ -402,7 +403,8 @@ public class JobMetricsServiceImpl extends 
SeatunnelBaseServiceImpl implements I
 
         log.info("jobInstance={}", jobInstance);
 
-        return new JobSummaryMetricsRes(jobInstance.getId(), 1L, readCount, 
writeCount, "RUNNING");
+        return new JobSummaryMetricsRes(
+                jobInstance.getId(), 1L, readCount, writeCount, 
JobStatus.RUNNING);
     }
 
     private JobSummaryMetricsRes getJobSummaryMetricsResByDb(
@@ -442,7 +444,7 @@ public class JobMetricsServiceImpl extends 
SeatunnelBaseServiceImpl implements I
         }
     }
 
-    private String getJobStatusByJobEngineId(String jobEngineId) {
+    private JobStatus getJobStatusByJobEngineId(String jobEngineId) {
         return SeaTunnelEngineProxy.getInstance().getJobStatus(jobEngineId);
     }
 
@@ -581,7 +583,7 @@ public class JobMetricsServiceImpl extends 
SeatunnelBaseServiceImpl implements I
                 
jobMetricsDao.getJobMetricsMapper().insertBatchMetrics(jobMetricsFromEngine);
             }
         } else {
-            String jobStatus = getJobStatusByJobEngineId(jobEngineId);
+            JobStatus jobStatus = getJobStatusByJobEngineId(jobEngineId);
             for (JobMetrics jobMetrics : jobMetricsFromDb) {
                 Integer pipelineId = jobMetrics.getPipelineId();
                 JobMetrics currentPiplinejobMetricsFromEngine =
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineMetricsExtractor.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineMetricsExtractor.java
index 7f8053fc..b9003d82 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineMetricsExtractor.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineMetricsExtractor.java
@@ -26,6 +26,7 @@ import 
org.apache.seatunnel.app.thirdparty.metrics.IEngineMetricsExtractor;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.engine.core.job.JobDAGInfo;
+import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
 import org.apache.seatunnel.server.common.SeatunnelException;
 
@@ -161,14 +162,14 @@ public class SeaTunnelEngineMetricsExtractor implements 
IEngineMetricsExtractor
 
     @Override
     public boolean isJobEnd(@NonNull String jobEngineId) {
-        String jobStatus = seaTunnelEngineProxy.getJobStatus(jobEngineId);
-        return "finished".equalsIgnoreCase(jobStatus)
-                || "canceled".equalsIgnoreCase(jobStatus)
-                || "failed".equalsIgnoreCase(jobStatus);
+        JobStatus jobStatus = seaTunnelEngineProxy.getJobStatus(jobEngineId);
+        return jobStatus == JobStatus.FINISHED
+                || jobStatus == JobStatus.CANCELED
+                || jobStatus == JobStatus.FAILED;
     }
 
     @Override
-    public String getJobStatus(@NonNull String jobEngineId) {
+    public JobStatus getJobStatus(@NonNull String jobEngineId) {
         return seaTunnelEngineProxy.getJobStatus(jobEngineId);
     }
 
@@ -293,13 +294,11 @@ public class SeaTunnelEngineMetricsExtractor implements 
IEngineMetricsExtractor
                 return new HashMap<>();
             }
             JsonNode jsonNode = 
JsonUtils.stringToJsonNode(allJobMetricsContent);
-            Iterator<JsonNode> iterator = jsonNode.iterator();
-            while (iterator.hasNext()) {
+            for (JsonNode item : jsonNode) {
                 LinkedHashMap<Integer, JobMetrics> metricsMap = new 
LinkedHashMap<>();
-                JsonNode next = iterator.next();
 
-                JsonNode sourceReceivedCount = 
next.get("metrics").get("SourceReceivedCount");
-                Long jobEngineId = 0L;
+                JsonNode sourceReceivedCount = 
item.get("metrics").get("SourceReceivedCount");
+                long jobEngineId = 0L;
                 if (sourceReceivedCount != null && 
sourceReceivedCount.isArray()) {
                     for (JsonNode node : sourceReceivedCount) {
                         jobEngineId = node.get("tags").get("jobId").asLong();
@@ -311,7 +310,7 @@ public class SeaTunnelEngineMetricsExtractor implements 
IEngineMetricsExtractor
                     }
                 }
 
-                JsonNode sinkWriteCount = 
next.get("metrics").get("SinkWriteCount");
+                JsonNode sinkWriteCount = 
item.get("metrics").get("SinkWriteCount");
                 if (sinkWriteCount != null && sinkWriteCount.isArray()) {
                     for (JsonNode node : sinkWriteCount) {
                         jobEngineId = node.get("tags").get("jobId").asLong();
@@ -324,7 +323,7 @@ public class SeaTunnelEngineMetricsExtractor implements 
IEngineMetricsExtractor
                     }
                 }
 
-                JsonNode sinkWriteQPS = 
next.get("metrics").get("SinkWriteQPS");
+                JsonNode sinkWriteQPS = 
item.get("metrics").get("SinkWriteQPS");
                 if (sinkWriteQPS != null && sinkWriteQPS.isArray()) {
                     for (JsonNode node : sinkWriteQPS) {
                         Integer pipelineId = 
node.get("tags").get("pipelineId").asInt();
@@ -336,7 +335,7 @@ public class SeaTunnelEngineMetricsExtractor implements 
IEngineMetricsExtractor
                     }
                 }
 
-                JsonNode sourceReceivedQPS = 
next.get("metrics").get("SourceReceivedQPS");
+                JsonNode sourceReceivedQPS = 
item.get("metrics").get("SourceReceivedQPS");
                 if (sourceReceivedQPS != null && sourceReceivedQPS.isArray()) {
                     for (JsonNode node : sourceReceivedQPS) {
                         Integer pipelineId = 
node.get("tags").get("pipelineId").asInt();
@@ -348,7 +347,7 @@ public class SeaTunnelEngineMetricsExtractor implements 
IEngineMetricsExtractor
                     }
                 }
 
-                JsonNode cdcRecordEmitDelay = 
next.get("metrics").get("CDCRecordEmitDelay");
+                JsonNode cdcRecordEmitDelay = 
item.get("metrics").get("CDCRecordEmitDelay");
                 if (cdcRecordEmitDelay != null && 
cdcRecordEmitDelay.isArray()) {
                     Map<Integer, List<Long>> dataMap = new HashMap<>();
                     for (JsonNode node : cdcRecordEmitDelay) {
@@ -387,7 +386,7 @@ public class SeaTunnelEngineMetricsExtractor implements 
IEngineMetricsExtractor
         JobMetrics currPipelineMetrics = metricsMap.get(pipelineId);
         if (currPipelineMetrics == null) {
             currPipelineMetrics = new JobMetrics();
-            currPipelineMetrics.setStatus("RUNNING");
+            currPipelineMetrics.setStatus(JobStatus.RUNNING);
             currPipelineMetrics.setPipelineId(pipelineId);
             metricsMap.put(pipelineId, currPipelineMetrics);
         }
@@ -402,7 +401,10 @@ public class SeaTunnelEngineMetricsExtractor implements 
IEngineMetricsExtractor
         if (currPipelineMetrics == null) {
             currPipelineMetrics = new JobMetrics();
             metricsMap.put(pipelineId, currPipelineMetrics);
-            currPipelineMetrics.setStatus(jobPipelineStatus.get(pipelineId));
+            currPipelineMetrics.setStatus(
+                    "DEPLOYING".equals(jobPipelineStatus.get(pipelineId))
+                            ? JobStatus.SCHEDULED
+                            : 
JobStatus.valueOf(jobPipelineStatus.get(pipelineId)));
             currPipelineMetrics.setPipelineId(pipelineId);
         }
         return currPipelineMetrics;
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineProxy.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineProxy.java
index 390ac627..554a37e9 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineProxy.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineProxy.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.common.config.YamlSeaTunnelConfigBuilder;
 import org.apache.seatunnel.engine.core.job.JobDAGInfo;
+import org.apache.seatunnel.engine.core.job.JobStatus;
 
 import com.hazelcast.client.config.ClientConfig;
 import lombok.NonNull;
@@ -74,10 +75,10 @@ public class SeaTunnelEngineProxy {
         }
     }
 
-    public String getJobStatus(@NonNull String jobEngineId) {
+    public JobStatus getJobStatus(@NonNull String jobEngineId) {
         SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);
         try {
-            return seaTunnelClient.getJobStatus(Long.valueOf(jobEngineId));
+            return 
JobStatus.valueOf(seaTunnelClient.getJobStatus(Long.valueOf(jobEngineId)));
         } catch (Exception e) {
             log.warn("Can not get job from engine.", e);
             return null;
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/metrics/IEngineMetricsExtractor.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/metrics/IEngineMetricsExtractor.java
index 528d848a..cee3af4e 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/metrics/IEngineMetricsExtractor.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/metrics/IEngineMetricsExtractor.java
@@ -18,6 +18,7 @@ package org.apache.seatunnel.app.thirdparty.metrics;
 
 import org.apache.seatunnel.app.dal.entity.JobInstanceHistory;
 import org.apache.seatunnel.app.dal.entity.JobMetrics;
+import org.apache.seatunnel.engine.core.job.JobStatus;
 
 import lombok.NonNull;
 
@@ -39,7 +40,7 @@ public interface IEngineMetricsExtractor {
 
     List<Map<String, String>> getClusterHealthMetrics();
 
-    String getJobStatus(@NonNull String jobEngineId);
+    JobStatus getJobStatus(@NonNull String jobEngineId);
 
     /** Obtain all running task metrics in the engine cluster */
     Map<Long, HashMap<Integer, JobMetrics>> getAllRunningJobMetrics();
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java
index b9801eca..bfa36830 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java
@@ -21,6 +21,7 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
 
 import org.apache.seatunnel.app.dal.entity.JobTask;
 import org.apache.seatunnel.app.domain.request.job.JobExecParam;
+import org.apache.seatunnel.engine.core.job.JobStatus;
 
 import java.util.List;
 import java.util.Map;
@@ -98,9 +99,9 @@ public class JobUtils {
                         });
     }
 
-    public static boolean isJobEndStatus(String jobStatus) {
-        return "finished".equalsIgnoreCase(jobStatus)
-                || "canceled".equalsIgnoreCase(jobStatus)
-                || "failed".equalsIgnoreCase(jobStatus);
+    public static boolean isJobEndStatus(JobStatus jobStatus) {
+        return JobStatus.FINISHED == jobStatus
+                || JobStatus.CANCELED == jobStatus
+                || JobStatus.FAILED == jobStatus;
     }
 }
diff --git 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java
 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java
index 2ff5ef73..c963324c 100644
--- 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java
+++ 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java
@@ -29,6 +29,7 @@ import 
org.apache.seatunnel.app.domain.response.job.JobConfigRes;
 import org.apache.seatunnel.app.domain.response.job.JobRes;
 import 
org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
 import org.apache.seatunnel.app.utils.JobTestingUtils;
+import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
 
 import org.junit.jupiter.api.AfterAll;
@@ -73,7 +74,7 @@ public class JobControllerTest {
         Result<List<JobPipelineDetailMetricsRes>> listResult =
                 JobTestingUtils.waitForJobCompletion(result.getData());
         assertEquals(1, listResult.getData().size());
-        assertEquals("FINISHED", listResult.getData().get(0).getStatus());
+        assertEquals(JobStatus.FINISHED, 
listResult.getData().get(0).getStatus());
         assertEquals(5, listResult.getData().get(0).getReadRowCount());
         assertEquals(5, listResult.getData().get(0).getWriteRowCount());
     }
diff --git 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
index 8cf6f999..6e1cb1ea 100644
--- 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
+++ 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
@@ -73,7 +73,7 @@ public class JobExecutorControllerTest {
         Result<List<JobPipelineDetailMetricsRes>> listResult =
                 JobTestingUtils.waitForJobCompletion(result.getData());
         assertEquals(1, listResult.getData().size());
-        assertEquals("FINISHED", listResult.getData().get(0).getStatus());
+        assertEquals(JobStatus.FINISHED, 
listResult.getData().get(0).getStatus());
         assertEquals(5, listResult.getData().get(0).getReadRowCount());
         assertEquals(5, listResult.getData().get(0).getWriteRowCount());
     }
@@ -88,7 +88,7 @@ public class JobExecutorControllerTest {
         Result<List<JobPipelineDetailMetricsRes>> listResult =
                 JobTestingUtils.waitForJobCompletion(result.getData());
         assertEquals(1, listResult.getData().size());
-        assertEquals("FINISHED", listResult.getData().get(0).getStatus());
+        assertEquals(JobStatus.FINISHED, 
listResult.getData().get(0).getStatus());
         assertEquals(5, listResult.getData().get(0).getReadRowCount());
         assertEquals(5, listResult.getData().get(0).getWriteRowCount());
         String generatedJobFile = 
getGenerateJobFile(String.valueOf(jobVersionId));
@@ -122,7 +122,7 @@ public class JobExecutorControllerTest {
         assertTrue(result.getData() > 0);
         listResult = JobTestingUtils.waitForJobCompletion(result.getData());
         assertEquals(1, listResult.getData().size());
-        assertEquals("FINISHED", listResult.getData().get(0).getStatus());
+        assertEquals(JobStatus.FINISHED, 
listResult.getData().get(0).getStatus());
         assertEquals(numberOfRecords, 
listResult.getData().get(0).getReadRowCount());
         assertEquals(numberOfRecords, 
listResult.getData().get(0).getWriteRowCount());
 
@@ -281,7 +281,7 @@ public class JobExecutorControllerTest {
         Result<JobExecutionStatus> jobExecutionStatusResult =
                 
jobExecutorControllerWrapper.getJobExecutionStatus(jobInstanceId);
         assertTrue(jobExecutionStatusResult.isSuccess());
-        assertEquals(JobStatus.FAILED.name(), 
jobExecutionStatusResult.getData().getJobStatus());
+        assertEquals(JobStatus.FAILED, 
jobExecutionStatusResult.getData().getJobStatus());
         assertNotNull(jobExecutionStatusResult.getData().getErrorMessage());
 
         // Invalid jobInstanceId
@@ -306,7 +306,7 @@ public class JobExecutorControllerTest {
         Result<SeaTunnelJobInstanceDto> jobExecutionDetailResult =
                 
jobExecutorControllerWrapper.getJobExecutionDetail(jobInstanceId);
         assertTrue(jobExecutionDetailResult.isSuccess());
-        assertEquals(JobStatus.FAILED.name(), 
jobExecutionDetailResult.getData().getJobStatus());
+        assertEquals(JobStatus.FAILED, 
jobExecutionDetailResult.getData().getJobStatus());
         assertNotNull(jobExecutionDetailResult.getData().getErrorMessage());
         assertNotNull(jobExecutionDetailResult.getData().getJobDefineName());
 
diff --git 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/TaskInstanceControllerTest.java
 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/TaskInstanceControllerTest.java
index 485e07cc..b5dfd733 100644
--- 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/TaskInstanceControllerTest.java
+++ 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/TaskInstanceControllerTest.java
@@ -25,6 +25,7 @@ import 
org.apache.seatunnel.app.controller.TaskInstanceControllerWrapper;
 import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
 import 
org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
 import org.apache.seatunnel.app.utils.JobTestingUtils;
+import org.apache.seatunnel.engine.core.job.JobStatus;
 
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
@@ -75,7 +76,7 @@ public class TaskInstanceControllerTest extends 
SeatunnelWebTestingBase {
         Result<List<JobPipelineDetailMetricsRes>> listResult =
                 
JobTestingUtils.waitForJobCompletion(execuitonResult.getData());
         assertEquals(1, listResult.getData().size());
-        assertEquals("FINISHED", listResult.getData().get(0).getStatus());
+        assertEquals(JobStatus.FINISHED, 
listResult.getData().get(0).getStatus());
     }
 
     @AfterAll
diff --git 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java
 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java
index e0e2b8d5..f4d6c525 100644
--- 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java
+++ 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java
@@ -99,9 +99,9 @@ public class JobTestingUtils {
             return false;
         }
         switch (metrics.getStatus()) {
-            case "FINISHED":
-            case "CANCELED":
-            case "FAILED":
+            case FINISHED:
+            case CANCELED:
+            case FAILED:
                 return true;
             default:
                 return false;


Reply via email to