This is an automated email from the ASF dual-hosted git repository. fanjia pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git
The following commit(s) were added to refs/heads/main by this push: new 94d8a394 [Improve][SeaTunnel-Web] Change JobStatus to enum type to avoid hard coding (#205) 94d8a394 is described below commit 94d8a394cb88e56c0cfdae1033154e96ab1c7789 Author: ChunFuWu <319355...@qq.com> AuthorDate: Sat Sep 7 16:32:20 2024 +0800 [Improve][SeaTunnel-Web] Change JobStatus to enum type to avoid hard coding (#205) --- .../seatunnel/app/dal/entity/JobInstance.java | 4 ++- .../seatunnel/app/dal/entity/JobMetrics.java | 4 ++- .../response/executor/JobExecutionStatus.java | 4 ++- .../metrics/JobPipelineDetailMetricsRes.java | 4 ++- .../metrics/JobPipelineSummaryMetricsRes.java | 4 ++- .../response/metrics/JobSummaryMetricsRes.java | 4 ++- .../app/domain/response/task/JobSimpleInfoRes.java | 4 ++- .../app/service/impl/JobExecutorServiceImpl.java | 9 +++-- .../app/service/impl/JobInstanceServiceImpl.java | 2 +- .../app/service/impl/JobMetricsServiceImpl.java | 42 +++++++++++----------- .../engine/SeaTunnelEngineMetricsExtractor.java | 34 +++++++++--------- .../thirdparty/engine/SeaTunnelEngineProxy.java | 5 +-- .../metrics/IEngineMetricsExtractor.java | 3 +- .../org/apache/seatunnel/app/utils/JobUtils.java | 9 ++--- .../seatunnel/app/test/JobControllerTest.java | 3 +- .../app/test/JobExecutorControllerTest.java | 10 +++--- .../app/test/TaskInstanceControllerTest.java | 3 +- .../seatunnel/app/utils/JobTestingUtils.java | 6 ++-- 18 files changed, 88 insertions(+), 66 deletions(-) diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobInstance.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobInstance.java index db5cbeba..221867d8 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobInstance.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobInstance.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.app.dal.entity; +import org.apache.seatunnel.engine.core.job.JobStatus; + import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; @@ -42,7 +44,7 @@ public class JobInstance { private Long jobDefineId; @TableField("job_status") - private String jobStatus; + private JobStatus jobStatus; @TableField("job_config") private String jobConfig; diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobMetrics.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobMetrics.java index 633ad645..6706c592 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobMetrics.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobMetrics.java @@ -16,6 +16,8 @@ */ package org.apache.seatunnel.app.dal.entity; +import org.apache.seatunnel.engine.core.job.JobStatus; + import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; @@ -65,7 +67,7 @@ public class JobMetrics { private long recordDelay; @TableField("status") - private String status; + private JobStatus status; @TableField("create_user_id") private Integer createUserId; diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/executor/JobExecutionStatus.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/executor/JobExecutionStatus.java index 833abd88..8ab8c4c8 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/executor/JobExecutionStatus.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/executor/JobExecutionStatus.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.app.domain.response.executor; +import org.apache.seatunnel.engine.core.job.JobStatus; + import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @@ -26,7 +28,7 @@ import lombok.NoArgsConstructor; @NoArgsConstructor public class JobExecutionStatus { - private String jobStatus; + private JobStatus jobStatus; private String errorMessage; } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobPipelineDetailMetricsRes.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobPipelineDetailMetricsRes.java index b864817c..a18c7c81 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobPipelineDetailMetricsRes.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobPipelineDetailMetricsRes.java @@ -16,6 +16,8 @@ */ package org.apache.seatunnel.app.domain.response.metrics; +import org.apache.seatunnel.engine.core.job.JobStatus; + import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -45,5 +47,5 @@ public class JobPipelineDetailMetricsRes { private long recordDelay; - private String status; + private JobStatus status; } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobPipelineSummaryMetricsRes.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobPipelineSummaryMetricsRes.java index cac0499e..4c9a3211 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobPipelineSummaryMetricsRes.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobPipelineSummaryMetricsRes.java @@ -16,6 +16,8 @@ */ package org.apache.seatunnel.app.domain.response.metrics; +import org.apache.seatunnel.engine.core.job.JobStatus; + import lombok.AllArgsConstructor; import lombok.Data; @@ -28,5 +30,5 @@ public class JobPipelineSummaryMetricsRes { private long writeRowCount; - private String status; + private JobStatus status; } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobSummaryMetricsRes.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobSummaryMetricsRes.java index d4b36fc5..0bb943b5 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobSummaryMetricsRes.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/metrics/JobSummaryMetricsRes.java @@ -16,6 +16,8 @@ */ package org.apache.seatunnel.app.domain.response.metrics; +import org.apache.seatunnel.engine.core.job.JobStatus; + import lombok.AllArgsConstructor; import lombok.Data; @@ -30,5 +32,5 @@ public class JobSummaryMetricsRes { private long writeRowCount; - private String status; + private JobStatus status; } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/task/JobSimpleInfoRes.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/task/JobSimpleInfoRes.java index 22fda68c..d66c4640 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/task/JobSimpleInfoRes.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/task/JobSimpleInfoRes.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.app.domain.response.task; +import org.apache.seatunnel.engine.core.job.JobStatus; + import com.fasterxml.jackson.annotation.JsonProperty; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; @@ -40,7 +42,7 @@ public class JobSimpleInfoRes { private String jobName; @ApiModelProperty(value = "job status", dataType = "String") - private String jobStatus; + private JobStatus jobStatus; @ApiModelProperty(value = "job plan", dataType = "String") private String jobPlan; diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java index 15250caa..ec1b37de 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java @@ -55,7 +55,6 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.util.Date; -import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -128,7 +127,7 @@ public class JobExecutorServiceImpl implements IJobExecutorService { } catch (Throwable e) { log.error("Job execution submission failed.", e); JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId); - jobInstance.setJobStatus(JobStatus.FAILED.name()); + jobInstance.setJobStatus(JobStatus.FAILED); jobInstance.setEndTime(new Date()); String jobInstanceErrorMessage = JobUtils.getJobInstanceErrorMessage(e.getMessage()); jobInstance.setErrorMessage(jobInstanceErrorMessage); @@ -183,14 +182,14 @@ public class JobExecutorServiceImpl implements IJobExecutorService { @Override public Result<Void> jobPause(Integer userId, Long jobInstanceId) { JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId); - if (Objects.equals( - getJobStatusFromEngine(jobInstance, jobInstance.getJobEngineId()), "RUNNING")) { + if (getJobStatusFromEngine(jobInstance, jobInstance.getJobEngineId()) + == JobStatus.RUNNING) { pauseJobInEngine(jobInstance.getJobEngineId()); } return Result.success(); } - private String getJobStatusFromEngine(@NonNull JobInstance jobInstance, String jobEngineId) { + private JobStatus getJobStatusFromEngine(@NonNull JobInstance jobInstance, String jobEngineId) { Engine engine = new Engine(jobInstance.getEngineName(), jobInstance.getEngineVersion()); diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java index 8dff7bff..d58d19b4 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java @@ -366,7 +366,7 @@ public class JobInstanceServiceImpl extends SeatunnelBaseServiceImpl funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_EXECUTOR_COMPLETE, userId); JobInstance jobInstance = jobInstanceDao.getJobInstanceMapper().selectById(jobInstanceId); jobMetricsService.syncJobDataToDb(jobInstance, userId, jobEngineId); - jobInstance.setJobStatus(jobResult.getStatus().name()); + jobInstance.setJobStatus(jobResult.getStatus()); jobInstance.setJobEngineId(jobEngineId); jobInstance.setUpdateUserId(userId); jobInstance.setErrorMessage(JobUtils.getJobInstanceErrorMessage(jobResult.getError())); diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java index d65679b0..f424aee0 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java @@ -33,6 +33,7 @@ import org.apache.seatunnel.app.thirdparty.engine.SeaTunnelEngineProxy; import org.apache.seatunnel.app.thirdparty.metrics.EngineMetricsExtractorFactory; import org.apache.seatunnel.app.thirdparty.metrics.IEngineMetricsExtractor; import org.apache.seatunnel.app.utils.JobUtils; +import org.apache.seatunnel.engine.core.job.JobStatus; import org.apache.seatunnel.server.common.CodeGenerateUtils; import org.apache.seatunnel.server.common.Constants; import org.apache.seatunnel.server.common.SeatunnelErrorEnum; @@ -89,7 +90,7 @@ public class JobMetricsServiceImpl extends SeatunnelBaseServiceImpl implements I Engine engine = new Engine(jobInstance.getEngineName(), jobInstance.getEngineVersion()); IEngineMetricsExtractor engineMetricsExtractor = (new EngineMetricsExtractorFactory(engine)).getEngineMetricsExtractor(); - String jobStatus = engineMetricsExtractor.getJobStatus(jobEngineId); + JobStatus jobStatus = engineMetricsExtractor.getJobStatus(jobEngineId); List<JobMetrics> jobPipelineDetailMetrics = getJobPipelineDetailMetrics(jobInstance, userId); @@ -160,8 +161,8 @@ public class JobMetricsServiceImpl extends SeatunnelBaseServiceImpl implements I log.info("jobEngineId={}", jobInstance.getJobEngineId()); if (jobInstance.getJobStatus() == null - || jobInstance.getJobStatus().equals("FAILED") - || jobInstance.getJobStatus().equals("RUNNING")) { + || jobInstance.getJobStatus() == JobStatus.FAILED + || jobInstance.getJobStatus() == JobStatus.RUNNING) { // Obtain monitoring information from the collection of running jobs returned from // the engine if (!allRunningJobMetricsFromEngine.isEmpty() @@ -192,15 +193,15 @@ public class JobMetricsServiceImpl extends SeatunnelBaseServiceImpl implements I if (jobMetricsFromDb != null) { jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetricsFromDb); } - if ("RUNNING".equals(jobInstance.getJobStatus())) { + if (jobInstance.getJobStatus() == JobStatus.RUNNING) { // Set the job status of jobInstance and jobMetrics in the database to // finished - jobInstance.setJobStatus("FINISHED"); + jobInstance.setJobStatus(JobStatus.FINISHED); jobInstanceDao.getJobInstanceMapper().updateById(jobInstance); } } - } else if (jobInstance.getJobStatus().equals("FINISHED") - || jobInstance.getJobStatus().equals("CANCELED")) { + } else if (jobInstance.getJobStatus() == JobStatus.FINISHED + || jobInstance.getJobStatus() == JobStatus.CANCELED) { // If the status of the job is finished or cancelled, the monitoring information is // directly obtained from MySQL JobSummaryMetricsRes jobMetricsFromDb = @@ -222,7 +223,7 @@ public class JobMetricsServiceImpl extends SeatunnelBaseServiceImpl implements I Map<Long, HashMap<Integer, JobMetrics>> allRunningJobMetricsFromEngine, Map<Long, Long> jobInstanceIdAndJobEngineIdMap, Integer userId) { - jobInstance.setJobStatus("RUNNING"); + jobInstance.setJobStatus(JobStatus.RUNNING); HashMap<Integer, JobMetrics> jobMetricsFromEngine = allRunningJobMetricsFromEngine.get( jobInstanceIdAndJobEngineIdMap.get(jobInstance.getId())); @@ -246,7 +247,7 @@ public class JobMetricsServiceImpl extends SeatunnelBaseServiceImpl implements I jobMetricsFromEngine .get(jobMetrics.getPipelineId()) .getWriteRowCount())); - jobMetricsFromDb.forEach(jobMetrics -> jobMetrics.setStatus("RUNNING")); + jobMetricsFromDb.forEach(jobMetrics -> jobMetrics.setStatus(JobStatus.RUNNING)); updateJobInstanceAndMetrics(jobInstance, jobMetricsFromDb); } @@ -265,7 +266,7 @@ public class JobMetricsServiceImpl extends SeatunnelBaseServiceImpl implements I try { if (jobInstance.getJobStatus() != null - && jobInstance.getJobStatus().equals("CANCELED")) { + && jobInstance.getJobStatus() == JobStatus.CANCELED) { // If the status of the job is finished or cancelled // the monitoring information is directly obtained from MySQL JobSummaryMetricsRes jobMetricsFromDb = @@ -278,8 +279,8 @@ public class JobMetricsServiceImpl extends SeatunnelBaseServiceImpl implements I jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetricsFromDb); } else if (jobInstance.getJobStatus() != null - && (jobInstance.getJobStatus().equals("FINISHED") - || jobInstance.getJobStatus().equals("FAILED"))) { + && (jobInstance.getJobStatus() == JobStatus.FINISHED + || jobInstance.getJobStatus() == JobStatus.FAILED)) { // Obtain monitoring information from the collection of running jobs returned // from // the engine @@ -332,9 +333,9 @@ public class JobMetricsServiceImpl extends SeatunnelBaseServiceImpl implements I jobInstance); jobSummaryMetricsResMap.put(jobInstance.getId(), jobMetricsFromEngineRes); } else { - String jobStatusByJobEngineId = null; + JobStatus jobStatus = null; try { - jobStatusByJobEngineId = + jobStatus = getJobStatusByJobEngineId( String.valueOf( jobInstanceIdAndJobEngineIdMap.get( @@ -345,8 +346,8 @@ public class JobMetricsServiceImpl extends SeatunnelBaseServiceImpl implements I jobInstance.getId()); } - if (jobStatusByJobEngineId != null) { - jobInstance.setJobStatus(jobStatusByJobEngineId); + if (jobStatus != null) { + jobInstance.setJobStatus(jobStatus); jobInstanceDao.update(jobInstance); JobSummaryMetricsRes jobSummaryMetricsResByDb = getJobSummaryMetricsResByDb( @@ -365,7 +366,7 @@ public class JobMetricsServiceImpl extends SeatunnelBaseServiceImpl implements I jobInstanceIdAndJobEngineIdMap.get( jobInstance.getId()))); if (!jobMetricsFromDb.isEmpty()) { - String finalJobStatusByJobEngineId = jobStatusByJobEngineId; + JobStatus finalJobStatusByJobEngineId = jobStatus; jobMetricsFromDb.forEach( jobMetrics -> jobMetrics.setStatus(finalJobStatusByJobEngineId)); @@ -402,7 +403,8 @@ public class JobMetricsServiceImpl extends SeatunnelBaseServiceImpl implements I log.info("jobInstance={}", jobInstance); - return new JobSummaryMetricsRes(jobInstance.getId(), 1L, readCount, writeCount, "RUNNING"); + return new JobSummaryMetricsRes( + jobInstance.getId(), 1L, readCount, writeCount, JobStatus.RUNNING); } private JobSummaryMetricsRes getJobSummaryMetricsResByDb( @@ -442,7 +444,7 @@ public class JobMetricsServiceImpl extends SeatunnelBaseServiceImpl implements I } } - private String getJobStatusByJobEngineId(String jobEngineId) { + private JobStatus getJobStatusByJobEngineId(String jobEngineId) { return SeaTunnelEngineProxy.getInstance().getJobStatus(jobEngineId); } @@ -581,7 +583,7 @@ public class JobMetricsServiceImpl extends SeatunnelBaseServiceImpl implements I jobMetricsDao.getJobMetricsMapper().insertBatchMetrics(jobMetricsFromEngine); } } else { - String jobStatus = getJobStatusByJobEngineId(jobEngineId); + JobStatus jobStatus = getJobStatusByJobEngineId(jobEngineId); for (JobMetrics jobMetrics : jobMetricsFromDb) { Integer pipelineId = jobMetrics.getPipelineId(); JobMetrics currentPiplinejobMetricsFromEngine = diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineMetricsExtractor.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineMetricsExtractor.java index 7f8053fc..b9003d82 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineMetricsExtractor.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineMetricsExtractor.java @@ -26,6 +26,7 @@ import org.apache.seatunnel.app.thirdparty.metrics.IEngineMetricsExtractor; import org.apache.seatunnel.common.utils.ExceptionUtils; import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.engine.core.job.JobDAGInfo; +import org.apache.seatunnel.engine.core.job.JobStatus; import org.apache.seatunnel.server.common.SeatunnelErrorEnum; import org.apache.seatunnel.server.common.SeatunnelException; @@ -161,14 +162,14 @@ public class SeaTunnelEngineMetricsExtractor implements IEngineMetricsExtractor @Override public boolean isJobEnd(@NonNull String jobEngineId) { - String jobStatus = seaTunnelEngineProxy.getJobStatus(jobEngineId); - return "finished".equalsIgnoreCase(jobStatus) - || "canceled".equalsIgnoreCase(jobStatus) - || "failed".equalsIgnoreCase(jobStatus); + JobStatus jobStatus = seaTunnelEngineProxy.getJobStatus(jobEngineId); + return jobStatus == JobStatus.FINISHED + || jobStatus == JobStatus.CANCELED + || jobStatus == JobStatus.FAILED; } @Override - public String getJobStatus(@NonNull String jobEngineId) { + public JobStatus getJobStatus(@NonNull String jobEngineId) { return seaTunnelEngineProxy.getJobStatus(jobEngineId); } @@ -293,13 +294,11 @@ public class SeaTunnelEngineMetricsExtractor implements IEngineMetricsExtractor return new HashMap<>(); } JsonNode jsonNode = JsonUtils.stringToJsonNode(allJobMetricsContent); - Iterator<JsonNode> iterator = jsonNode.iterator(); - while (iterator.hasNext()) { + for (JsonNode item : jsonNode) { LinkedHashMap<Integer, JobMetrics> metricsMap = new LinkedHashMap<>(); - JsonNode next = iterator.next(); - JsonNode sourceReceivedCount = next.get("metrics").get("SourceReceivedCount"); - Long jobEngineId = 0L; + JsonNode sourceReceivedCount = item.get("metrics").get("SourceReceivedCount"); + long jobEngineId = 0L; if (sourceReceivedCount != null && sourceReceivedCount.isArray()) { for (JsonNode node : sourceReceivedCount) { jobEngineId = node.get("tags").get("jobId").asLong(); @@ -311,7 +310,7 @@ public class SeaTunnelEngineMetricsExtractor implements IEngineMetricsExtractor } } - JsonNode sinkWriteCount = next.get("metrics").get("SinkWriteCount"); + JsonNode sinkWriteCount = item.get("metrics").get("SinkWriteCount"); if (sinkWriteCount != null && sinkWriteCount.isArray()) { for (JsonNode node : sinkWriteCount) { jobEngineId = node.get("tags").get("jobId").asLong(); @@ -324,7 +323,7 @@ public class SeaTunnelEngineMetricsExtractor implements IEngineMetricsExtractor } } - JsonNode sinkWriteQPS = next.get("metrics").get("SinkWriteQPS"); + JsonNode sinkWriteQPS = item.get("metrics").get("SinkWriteQPS"); if (sinkWriteQPS != null && sinkWriteQPS.isArray()) { for (JsonNode node : sinkWriteQPS) { Integer pipelineId = node.get("tags").get("pipelineId").asInt(); @@ -336,7 +335,7 @@ public class SeaTunnelEngineMetricsExtractor implements IEngineMetricsExtractor } } - JsonNode sourceReceivedQPS = next.get("metrics").get("SourceReceivedQPS"); + JsonNode sourceReceivedQPS = item.get("metrics").get("SourceReceivedQPS"); if (sourceReceivedQPS != null && sourceReceivedQPS.isArray()) { for (JsonNode node : sourceReceivedQPS) { Integer pipelineId = node.get("tags").get("pipelineId").asInt(); @@ -348,7 +347,7 @@ public class SeaTunnelEngineMetricsExtractor implements IEngineMetricsExtractor } } - JsonNode cdcRecordEmitDelay = next.get("metrics").get("CDCRecordEmitDelay"); + JsonNode cdcRecordEmitDelay = item.get("metrics").get("CDCRecordEmitDelay"); if (cdcRecordEmitDelay != null && cdcRecordEmitDelay.isArray()) { Map<Integer, List<Long>> dataMap = new HashMap<>(); for (JsonNode node : cdcRecordEmitDelay) { @@ -387,7 +386,7 @@ public class SeaTunnelEngineMetricsExtractor implements IEngineMetricsExtractor JobMetrics currPipelineMetrics = metricsMap.get(pipelineId); if (currPipelineMetrics == null) { currPipelineMetrics = new JobMetrics(); - currPipelineMetrics.setStatus("RUNNING"); + currPipelineMetrics.setStatus(JobStatus.RUNNING); currPipelineMetrics.setPipelineId(pipelineId); metricsMap.put(pipelineId, currPipelineMetrics); } @@ -402,7 +401,10 @@ public class SeaTunnelEngineMetricsExtractor implements IEngineMetricsExtractor if (currPipelineMetrics == null) { currPipelineMetrics = new JobMetrics(); metricsMap.put(pipelineId, currPipelineMetrics); - currPipelineMetrics.setStatus(jobPipelineStatus.get(pipelineId)); + currPipelineMetrics.setStatus( + "DEPLOYING".equals(jobPipelineStatus.get(pipelineId)) + ? JobStatus.SCHEDULED + : JobStatus.valueOf(jobPipelineStatus.get(pipelineId))); currPipelineMetrics.setPipelineId(pipelineId); } return currPipelineMetrics; diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineProxy.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineProxy.java index 390ac627..554a37e9 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineProxy.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineProxy.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.engine.common.config.JobConfig; import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; import org.apache.seatunnel.engine.common.config.YamlSeaTunnelConfigBuilder; import org.apache.seatunnel.engine.core.job.JobDAGInfo; +import org.apache.seatunnel.engine.core.job.JobStatus; import com.hazelcast.client.config.ClientConfig; import lombok.NonNull; @@ -74,10 +75,10 @@ public class SeaTunnelEngineProxy { } } - public String getJobStatus(@NonNull String jobEngineId) { + public JobStatus getJobStatus(@NonNull String jobEngineId) { SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig); try { - return seaTunnelClient.getJobStatus(Long.valueOf(jobEngineId)); + return JobStatus.valueOf(seaTunnelClient.getJobStatus(Long.valueOf(jobEngineId))); } catch (Exception e) { log.warn("Can not get job from engine.", e); return null; diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/metrics/IEngineMetricsExtractor.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/metrics/IEngineMetricsExtractor.java index 528d848a..cee3af4e 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/metrics/IEngineMetricsExtractor.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/metrics/IEngineMetricsExtractor.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.app.thirdparty.metrics; import org.apache.seatunnel.app.dal.entity.JobInstanceHistory; import org.apache.seatunnel.app.dal.entity.JobMetrics; +import org.apache.seatunnel.engine.core.job.JobStatus; import lombok.NonNull; @@ -39,7 +40,7 @@ public interface IEngineMetricsExtractor { List<Map<String, String>> getClusterHealthMetrics(); - String getJobStatus(@NonNull String jobEngineId); + JobStatus getJobStatus(@NonNull String jobEngineId); /** Obtain all running task metrics in the engine cluster */ Map<Long, HashMap<Integer, JobMetrics>> getAllRunningJobMetrics(); diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java index b9801eca..bfa36830 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory; import org.apache.seatunnel.app.dal.entity.JobTask; import org.apache.seatunnel.app.domain.request.job.JobExecParam; +import org.apache.seatunnel.engine.core.job.JobStatus; import java.util.List; import java.util.Map; @@ -98,9 +99,9 @@ public class JobUtils { }); } - public static boolean isJobEndStatus(String jobStatus) { - return "finished".equalsIgnoreCase(jobStatus) - || "canceled".equalsIgnoreCase(jobStatus) - || "failed".equalsIgnoreCase(jobStatus); + public static boolean isJobEndStatus(JobStatus jobStatus) { + return JobStatus.FINISHED == jobStatus + || JobStatus.CANCELED == jobStatus + || JobStatus.FAILED == jobStatus; } } diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java index 2ff5ef73..c963324c 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java @@ -29,6 +29,7 @@ import org.apache.seatunnel.app.domain.response.job.JobConfigRes; import org.apache.seatunnel.app.domain.response.job.JobRes; import org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes; import org.apache.seatunnel.app.utils.JobTestingUtils; +import org.apache.seatunnel.engine.core.job.JobStatus; import org.apache.seatunnel.server.common.SeatunnelErrorEnum; import org.junit.jupiter.api.AfterAll; @@ -73,7 +74,7 @@ public class JobControllerTest { Result<List<JobPipelineDetailMetricsRes>> listResult = JobTestingUtils.waitForJobCompletion(result.getData()); assertEquals(1, listResult.getData().size()); - assertEquals("FINISHED", listResult.getData().get(0).getStatus()); + assertEquals(JobStatus.FINISHED, listResult.getData().get(0).getStatus()); assertEquals(5, listResult.getData().get(0).getReadRowCount()); assertEquals(5, listResult.getData().get(0).getWriteRowCount()); } diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java index 8cf6f999..6e1cb1ea 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java @@ -73,7 +73,7 @@ public class JobExecutorControllerTest { Result<List<JobPipelineDetailMetricsRes>> listResult = JobTestingUtils.waitForJobCompletion(result.getData()); assertEquals(1, listResult.getData().size()); - assertEquals("FINISHED", listResult.getData().get(0).getStatus()); + assertEquals(JobStatus.FINISHED, listResult.getData().get(0).getStatus()); assertEquals(5, listResult.getData().get(0).getReadRowCount()); assertEquals(5, listResult.getData().get(0).getWriteRowCount()); } @@ -88,7 +88,7 @@ public class JobExecutorControllerTest { Result<List<JobPipelineDetailMetricsRes>> listResult = JobTestingUtils.waitForJobCompletion(result.getData()); assertEquals(1, listResult.getData().size()); - assertEquals("FINISHED", listResult.getData().get(0).getStatus()); + assertEquals(JobStatus.FINISHED, listResult.getData().get(0).getStatus()); assertEquals(5, listResult.getData().get(0).getReadRowCount()); assertEquals(5, listResult.getData().get(0).getWriteRowCount()); String generatedJobFile = getGenerateJobFile(String.valueOf(jobVersionId)); @@ -122,7 +122,7 @@ public class JobExecutorControllerTest { assertTrue(result.getData() > 0); listResult = JobTestingUtils.waitForJobCompletion(result.getData()); assertEquals(1, listResult.getData().size()); - assertEquals("FINISHED", listResult.getData().get(0).getStatus()); + assertEquals(JobStatus.FINISHED, listResult.getData().get(0).getStatus()); assertEquals(numberOfRecords, listResult.getData().get(0).getReadRowCount()); assertEquals(numberOfRecords, listResult.getData().get(0).getWriteRowCount()); @@ -281,7 +281,7 @@ public class JobExecutorControllerTest { Result<JobExecutionStatus> jobExecutionStatusResult = jobExecutorControllerWrapper.getJobExecutionStatus(jobInstanceId); assertTrue(jobExecutionStatusResult.isSuccess()); - assertEquals(JobStatus.FAILED.name(), jobExecutionStatusResult.getData().getJobStatus()); + assertEquals(JobStatus.FAILED, jobExecutionStatusResult.getData().getJobStatus()); assertNotNull(jobExecutionStatusResult.getData().getErrorMessage()); // Invalid jobInstanceId @@ -306,7 +306,7 @@ public class JobExecutorControllerTest { Result<SeaTunnelJobInstanceDto> jobExecutionDetailResult = jobExecutorControllerWrapper.getJobExecutionDetail(jobInstanceId); assertTrue(jobExecutionDetailResult.isSuccess()); - assertEquals(JobStatus.FAILED.name(), jobExecutionDetailResult.getData().getJobStatus()); + assertEquals(JobStatus.FAILED, jobExecutionDetailResult.getData().getJobStatus()); assertNotNull(jobExecutionDetailResult.getData().getErrorMessage()); assertNotNull(jobExecutionDetailResult.getData().getJobDefineName()); diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/TaskInstanceControllerTest.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/TaskInstanceControllerTest.java index 485e07cc..b5dfd733 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/TaskInstanceControllerTest.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/TaskInstanceControllerTest.java @@ -25,6 +25,7 @@ import org.apache.seatunnel.app.controller.TaskInstanceControllerWrapper; import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto; import org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes; import org.apache.seatunnel.app.utils.JobTestingUtils; +import org.apache.seatunnel.engine.core.job.JobStatus; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -75,7 +76,7 @@ public class TaskInstanceControllerTest extends SeatunnelWebTestingBase { Result<List<JobPipelineDetailMetricsRes>> listResult = JobTestingUtils.waitForJobCompletion(execuitonResult.getData()); assertEquals(1, listResult.getData().size()); - assertEquals("FINISHED", listResult.getData().get(0).getStatus()); + assertEquals(JobStatus.FINISHED, listResult.getData().get(0).getStatus()); } @AfterAll diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java index e0e2b8d5..f4d6c525 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java @@ -99,9 +99,9 @@ public class JobTestingUtils { return false; } switch (metrics.getStatus()) { - case "FINISHED": - case "CANCELED": - case "FAILED": + case FINISHED: + case CANCELED: + case FAILED: return true; default: return false;