This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git


The following commit(s) were added to refs/heads/main by this push:
     new 7140ad24 [Improvement] [Seatunnel-web] Improve code in 
DatasourceServiceImpl,SeaTunnelEngineMetricsExtractor etc. (#199)
7140ad24 is described below

commit 7140ad24b8c9a30c3cad7897eb77e84ca99f538a
Author: Mohammad Arshad <ars...@apache.org>
AuthorDate: Fri Aug 30 23:19:45 2024 +0530

    [Improvement] [Seatunnel-web] Improve code in 
DatasourceServiceImpl,SeaTunnelEngineMetricsExtractor etc. (#199)
---
 .../app/service/impl/DatasourceServiceImpl.java    |  33 +---
 .../app/service/impl/JobExecutorServiceImpl.java   |   6 +-
 .../app/service/impl/JobInstanceServiceImpl.java   |  30 ++-
 .../engine/SeaTunnelEngineMetricsExtractor.java    | 214 +++++++--------------
 .../utils/{JobExecParamUtil.java => JobUtils.java} |   2 +-
 .../seatunnel/app/test/JobControllerTest.java      |   4 +-
 .../app/test/JobExecutorControllerTest.java        |  26 +--
 .../app/test/JobMetricsControllerTest.java         |   4 +-
 .../app/test/TaskInstanceControllerTest.java       |   6 +-
 .../utils/{JobUtils.java => JobTestingUtils.java}  |   2 +-
 10 files changed, 114 insertions(+), 213 deletions(-)

diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/DatasourceServiceImpl.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/DatasourceServiceImpl.java
index 4e290fc0..403e0420 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/DatasourceServiceImpl.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/DatasourceServiceImpl.java
@@ -570,17 +570,7 @@ public class DatasourceServiceImpl extends 
SeatunnelBaseServiceImpl
 
         datasourceList.forEach(
                 datasource -> {
-                    DatasourceDetailRes datasourceDetailRes = new 
DatasourceDetailRes();
-                    datasourceDetailRes.setId(datasource.getId().toString());
-                    
datasourceDetailRes.setDatasourceName(datasource.getDatasourceName());
-                    
datasourceDetailRes.setPluginName(datasource.getPluginName());
-                    
datasourceDetailRes.setPluginVersion(datasource.getPluginVersion());
-                    
datasourceDetailRes.setDescription(datasource.getDescription());
-                    
datasourceDetailRes.setCreateTime(datasource.getCreateTime());
-                    
datasourceDetailRes.setUpdateTime(datasource.getUpdateTime());
-                    Map<String, String> config = 
JsonUtils.toMap(datasource.getDatasourceConfig());
-                    datasourceDetailRes.setDatasourceConfig(config);
-                    datasourceDetailResList.add(datasourceDetailRes);
+                    
datasourceDetailResList.add(getDatasourceDetailRes(datasource));
                 });
         return datasourceDetailResList;
     }
@@ -599,6 +589,10 @@ public class DatasourceServiceImpl extends 
SeatunnelBaseServiceImpl
         if (null == datasource) {
             throw new 
SeatunnelException(SeatunnelErrorEnum.DATASOURCE_NOT_FOUND, datasourceName);
         }
+        return getDatasourceDetailRes(datasource);
+    }
+
+    private static DatasourceDetailRes getDatasourceDetailRes(Datasource 
datasource) {
         DatasourceDetailRes datasourceDetailRes = new DatasourceDetailRes();
         datasourceDetailRes.setId(datasource.getId().toString());
         datasourceDetailRes.setDatasourceName(datasource.getDatasourceName());
@@ -612,7 +606,6 @@ public class DatasourceServiceImpl extends 
SeatunnelBaseServiceImpl
                 JsonUtils.toMap(datasource.getDatasourceConfig(), 
String.class, String.class);
         // convert option rule
         datasourceDetailRes.setDatasourceConfig(datasourceConfig);
-
         return datasourceDetailRes;
     }
 
@@ -624,21 +617,7 @@ public class DatasourceServiceImpl extends 
SeatunnelBaseServiceImpl
         if (null == datasource) {
             throw new 
SeatunnelException(SeatunnelErrorEnum.DATASOURCE_NOT_FOUND, datasourceId);
         }
-        DatasourceDetailRes datasourceDetailRes = new DatasourceDetailRes();
-        datasourceDetailRes.setId(datasource.getId().toString());
-        datasourceDetailRes.setDatasourceName(datasource.getDatasourceName());
-        datasourceDetailRes.setPluginName(datasource.getPluginName());
-        datasourceDetailRes.setPluginVersion(datasource.getPluginVersion());
-        datasourceDetailRes.setDescription(datasource.getDescription());
-        datasourceDetailRes.setCreateTime(datasource.getCreateTime());
-        datasourceDetailRes.setUpdateTime(datasource.getUpdateTime());
-
-        Map<String, String> datasourceConfig =
-                JsonUtils.toMap(datasource.getDatasourceConfig(), 
String.class, String.class);
-        // convert option rule
-        datasourceDetailRes.setDatasourceConfig(datasourceConfig);
-
-        return datasourceDetailRes;
+        return getDatasourceDetailRes(datasource);
     }
 
     @Override
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
index e7490347..15250caa 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
@@ -28,7 +28,7 @@ import org.apache.seatunnel.app.service.IJobInstanceService;
 import org.apache.seatunnel.app.thirdparty.engine.SeaTunnelEngineProxy;
 import 
org.apache.seatunnel.app.thirdparty.metrics.EngineMetricsExtractorFactory;
 import org.apache.seatunnel.app.thirdparty.metrics.IEngineMetricsExtractor;
-import org.apache.seatunnel.app.utils.JobExecParamUtil;
+import org.apache.seatunnel.app.utils.JobUtils;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.engine.client.SeaTunnelClient;
@@ -130,8 +130,7 @@ public class JobExecutorServiceImpl implements 
IJobExecutorService {
             JobInstance jobInstance = 
jobInstanceDao.getJobInstance(jobInstanceId);
             jobInstance.setJobStatus(JobStatus.FAILED.name());
             jobInstance.setEndTime(new Date());
-            String jobInstanceErrorMessage =
-                    
JobExecParamUtil.getJobInstanceErrorMessage(e.getMessage());
+            String jobInstanceErrorMessage = 
JobUtils.getJobInstanceErrorMessage(e.getMessage());
             jobInstance.setErrorMessage(jobInstanceErrorMessage);
             jobInstanceDao.update(jobInstance);
             throw new RuntimeException(e.getMessage(), e);
@@ -178,7 +177,6 @@ public class JobExecutorServiceImpl implements 
IJobExecutorService {
 
     private SeaTunnelClient createSeaTunnelClient() {
         ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
-        clientConfig.setClusterName(clientConfig.getClusterName());
         return new SeaTunnelClient(clientConfig);
     }
 
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
index 68f7ee00..8dff7bff 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java
@@ -50,6 +50,7 @@ import 
org.apache.seatunnel.app.domain.request.job.SelectTableFields;
 import org.apache.seatunnel.app.domain.request.job.TableSchemaReq;
 import org.apache.seatunnel.app.domain.request.job.transform.Transform;
 import org.apache.seatunnel.app.domain.request.job.transform.TransformOptions;
+import org.apache.seatunnel.app.domain.response.datasource.DatasourceDetailRes;
 import 
org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
 import org.apache.seatunnel.app.domain.response.executor.JobExecutorRes;
 import 
org.apache.seatunnel.app.permission.constants.SeatunnelFuncPermissionKeyConstant;
@@ -59,7 +60,7 @@ import org.apache.seatunnel.app.service.IJobMetricsService;
 import org.apache.seatunnel.app.service.IVirtualTableService;
 import 
org.apache.seatunnel.app.thirdparty.datasource.DataSourceConfigSwitcherUtils;
 import 
org.apache.seatunnel.app.thirdparty.transfrom.TransformConfigSwitcherUtils;
-import org.apache.seatunnel.app.utils.JobExecParamUtil;
+import org.apache.seatunnel.app.utils.JobUtils;
 import org.apache.seatunnel.app.utils.SeaTunnelConfigUtil;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
@@ -173,8 +174,8 @@ public class JobInstanceServiceImpl extends 
SeatunnelBaseServiceImpl
         BusinessMode businessMode =
                 
BusinessMode.valueOf(jobDefinitionDao.getJob(jobId).getJobType());
         Config envConfig = filterEmptyValue(ConfigFactory.parseString(envStr));
-        envConfig = JobExecParamUtil.updateEnvConfig(executeParam, envConfig);
-        JobExecParamUtil.updateDataSource(executeParam, tasks);
+        envConfig = JobUtils.updateEnvConfig(executeParam, envConfig);
+        JobUtils.updateDataSource(executeParam, tasks);
 
         Map<String, List<Config>> sourceMap = new LinkedHashMap<>();
         Map<String, List<Config>> transformMap = new LinkedHashMap<>();
@@ -230,8 +231,7 @@ public class JobInstanceServiceImpl extends 
SeatunnelBaseServiceImpl
                             }
 
                             config =
-                                    JobExecParamUtil.updateTaskConfig(
-                                            executeParam, config, 
task.getName());
+                                    JobUtils.updateTaskConfig(executeParam, 
config, task.getName());
                             Config mergeConfig =
                                     mergeTaskConfig(
                                             task,
@@ -241,7 +241,7 @@ public class JobInstanceServiceImpl extends 
SeatunnelBaseServiceImpl
                                             config,
                                             optionRule);
                             mergeConfig =
-                                    JobExecParamUtil.updateQueryTaskConfig(
+                                    JobUtils.updateQueryTaskConfig(
                                             executeParam, mergeConfig, 
task.getName());
                             sourceMap
                                     .get(task.getConnectorType())
@@ -273,7 +273,7 @@ public class JobInstanceServiceImpl extends 
SeatunnelBaseServiceImpl
                         List<TableSchemaReq> inputSchemas = 
findInputSchemas(tasks, lines, task);
                         Config transformConfig = buildTransformConfig(task, 
config, inputSchemas);
                         transformConfig =
-                                JobExecParamUtil.updateTaskConfig(
+                                JobUtils.updateTaskConfig(
                                         executeParam, transformConfig, 
task.getName());
                         transformMap
                                 .get(task.getConnectorType())
@@ -290,8 +290,7 @@ public class JobInstanceServiceImpl extends 
SeatunnelBaseServiceImpl
                                 sinkMap.put(task.getConnectorType(), new 
ArrayList<>());
                             }
                             config =
-                                    JobExecParamUtil.updateTaskConfig(
-                                            executeParam, config, 
task.getName());
+                                    JobUtils.updateTaskConfig(executeParam, 
config, task.getName());
                             Config mergeConfig =
                                     mergeTaskConfig(
                                             task,
@@ -370,8 +369,7 @@ public class JobInstanceServiceImpl extends 
SeatunnelBaseServiceImpl
         jobInstance.setJobStatus(jobResult.getStatus().name());
         jobInstance.setJobEngineId(jobEngineId);
         jobInstance.setUpdateUserId(userId);
-        jobInstance.setErrorMessage(
-                
JobExecParamUtil.getJobInstanceErrorMessage(jobResult.getError()));
+        
jobInstance.setErrorMessage(JobUtils.getJobInstanceErrorMessage(jobResult.getError()));
         jobInstanceDao.update(jobInstance);
     }
 
@@ -436,16 +434,14 @@ public class JobInstanceServiceImpl extends 
SeatunnelBaseServiceImpl
             throws JsonProcessingException {
 
         Long datasourceInstanceId = task.getDataSourceId();
-        String pluginName =
-                datasourceService
-                        
.queryDatasourceDetailById(datasourceInstanceId.toString())
-                        .getPluginName();
+        DatasourceDetailRes datasourceDetailRes =
+                
datasourceService.queryDatasourceDetailById(datasourceInstanceId.toString());
+        String pluginName = datasourceDetailRes.getPluginName();
         Config datasourceConf =
                 parseConfigWithOptionRule(
                         pluginType,
                         connectorType,
-                        datasourceService.queryDatasourceConfigById(
-                                datasourceInstanceId.toString()),
+                        datasourceDetailRes.getDatasourceConfig(),
                         optionRule);
 
         DataSourceOption dataSourceOption = null;
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineMetricsExtractor.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineMetricsExtractor.java
index c8426293..0a7f35fd 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineMetricsExtractor.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineMetricsExtractor.java
@@ -92,7 +92,6 @@ public class SeaTunnelEngineMetricsExtractor implements 
IEngineMetricsExtractor
 
     @Override
     public List<JobMetrics> getMetricsByJobEngineId(@NonNull String 
jobEngineId) {
-        LinkedHashMap<Integer, JobMetrics> metricsMap = new LinkedHashMap<>();
 
         LinkedHashMap<Integer, String> jobPipelineStatus = 
getJobPipelineStatus(jobEngineId);
         try {
@@ -103,83 +102,15 @@ public class SeaTunnelEngineMetricsExtractor implements 
IEngineMetricsExtractor
 
             JsonNode jsonNode =
                     
JsonUtils.stringToJsonNode(seaTunnelEngineProxy.getMetricsContent(jobEngineId));
-            JsonNode sourceReceivedCount = jsonNode.get("SourceReceivedCount");
-            if (sourceReceivedCount != null && sourceReceivedCount.isArray()) {
-                for (JsonNode node : sourceReceivedCount) {
-                    Integer pipelineId = 
node.get("tags").get("pipelineId").asInt();
-                    JobMetrics currPipelineMetrics =
-                            getOrCreatePipelineMetricsMap(
-                                    metricsMap, jobPipelineStatus, pipelineId);
-                    currPipelineMetrics.setReadRowCount(
-                            currPipelineMetrics.getReadRowCount() + 
node.get("value").asLong());
-                }
-            }
-
-            JsonNode sinkWriteCount = jsonNode.get("SinkWriteCount");
-            if (sinkWriteCount != null && sinkWriteCount.isArray()) {
-                for (JsonNode node : jsonNode.get("SinkWriteCount")) {
-                    Integer pipelineId = 
node.get("tags").get("pipelineId").asInt();
-                    JobMetrics currPipelineMetrics =
-                            getOrCreatePipelineMetricsMap(
-                                    metricsMap, jobPipelineStatus, pipelineId);
-                    currPipelineMetrics.setWriteRowCount(
-                            currPipelineMetrics.getWriteRowCount() + 
node.get("value").asLong());
-                }
-            }
-
-            JsonNode sinkWriteQPS = jsonNode.get("SinkWriteQPS");
-            if (sinkWriteQPS != null && sinkWriteQPS.isArray()) {
-                for (JsonNode node : jsonNode.get("SinkWriteQPS")) {
-                    Integer pipelineId = 
node.get("tags").get("pipelineId").asInt();
-                    JobMetrics currPipelineMetrics =
-                            getOrCreatePipelineMetricsMap(
-                                    metricsMap, jobPipelineStatus, pipelineId);
-                    currPipelineMetrics.setWriteQps(
-                            currPipelineMetrics.getWriteQps()
-                                    + (new 
Double(node.get("value").asDouble())).longValue());
-                }
-            }
-
-            JsonNode sourceReceivedQPS = jsonNode.get("SourceReceivedQPS");
-            if (sourceReceivedQPS != null && sourceReceivedQPS.isArray()) {
-                for (JsonNode node : jsonNode.get("SourceReceivedQPS")) {
-                    Integer pipelineId = 
node.get("tags").get("pipelineId").asInt();
-                    JobMetrics currPipelineMetrics =
-                            getOrCreatePipelineMetricsMap(
-                                    metricsMap, jobPipelineStatus, pipelineId);
-                    currPipelineMetrics.setReadQps(
-                            currPipelineMetrics.getReadQps()
-                                    + (new 
Double(node.get("value").asDouble())).longValue());
-                }
-            }
-
-            JsonNode cdcRecordEmitDelay = jsonNode.get("CDCRecordEmitDelay");
-            if (cdcRecordEmitDelay != null && cdcRecordEmitDelay.isArray()) {
-                Map<Integer, List<Long>> dataMap = new HashMap<>();
-                for (JsonNode node : jsonNode.get("CDCRecordEmitDelay")) {
-                    Integer pipelineId = 
node.get("tags").get("pipelineId").asInt();
-                    long value = node.get("value").asLong();
-                    dataMap.computeIfAbsent(pipelineId, n -> new 
ArrayList<>()).add(value);
-                }
-                dataMap.forEach(
-                        (key, value) -> {
-                            JobMetrics currPipelineMetrics =
-                                    getOrCreatePipelineMetricsMap(
-                                            metricsMap, jobPipelineStatus, 
key);
-                            OptionalDouble average = 
value.stream().mapToDouble(a -> a).average();
-                            currPipelineMetrics.setRecordDelay(
-                                    Double.valueOf(average.isPresent() ? 
average.getAsDouble() : 0)
-                                            .longValue());
-                        });
-            }
+            LinkedHashMap<Integer, JobMetrics> metricsMap =
+                    extractMetrics(jobPipelineStatus, jsonNode);
+            return Arrays.asList(metricsMap.values().toArray(new 
JobMetrics[0]));
         } catch (JsonProcessingException e) {
             throw new SeatunnelException(
                     SeatunnelErrorEnum.LOAD_ENGINE_METRICS_JSON_ERROR,
                     "SeaTunnel",
                     ExceptionUtils.getMessage(e));
         }
-
-        return Arrays.asList(metricsMap.values().toArray(new JobMetrics[0]));
     }
 
     @Override
@@ -267,8 +198,6 @@ public class SeaTunnelEngineMetricsExtractor implements 
IEngineMetricsExtractor
 
     @Override
     public Map<Integer, JobMetrics> getMetricsByJobEngineIdRTMap(@NonNull 
String jobEngineId) {
-        LinkedHashMap<Integer, JobMetrics> metricsMap = new LinkedHashMap<>();
-
         LinkedHashMap<Integer, String> jobPipelineStatus = 
getJobPipelineStatus(jobEngineId);
         try {
             String metricsContent = 
seaTunnelEngineProxy.getMetricsContent(jobEngineId);
@@ -276,84 +205,83 @@ public class SeaTunnelEngineMetricsExtractor implements 
IEngineMetricsExtractor
                 return new HashMap<>();
             }
 
-            JsonNode jsonNode =
-                    
JsonUtils.stringToJsonNode(seaTunnelEngineProxy.getMetricsContent(jobEngineId));
-            JsonNode sourceReceivedCount = jsonNode.get("SourceReceivedCount");
-            if (sourceReceivedCount != null && sourceReceivedCount.isArray()) {
-                for (JsonNode node : sourceReceivedCount) {
-                    Integer pipelineId = 
node.get("tags").get("pipelineId").asInt();
-                    JobMetrics currPipelineMetrics =
-                            getOrCreatePipelineMetricsMap(
-                                    metricsMap, jobPipelineStatus, pipelineId);
-                    currPipelineMetrics.setReadRowCount(
-                            currPipelineMetrics.getReadRowCount() + 
node.get("value").asLong());
-                }
-            }
+            JsonNode jsonNode = JsonUtils.stringToJsonNode(metricsContent);
+            return extractMetrics(jobPipelineStatus, jsonNode);
+        } catch (JsonProcessingException e) {
+            throw new SeatunnelException(
+                    SeatunnelErrorEnum.LOAD_ENGINE_METRICS_JSON_ERROR,
+                    "SeaTunnel",
+                    ExceptionUtils.getMessage(e));
+        }
+    }
 
-            JsonNode sinkWriteCount = jsonNode.get("SinkWriteCount");
-            if (sinkWriteCount != null && sinkWriteCount.isArray()) {
-                for (JsonNode node : jsonNode.get("SinkWriteCount")) {
-                    Integer pipelineId = 
node.get("tags").get("pipelineId").asInt();
-                    JobMetrics currPipelineMetrics =
-                            getOrCreatePipelineMetricsMap(
-                                    metricsMap, jobPipelineStatus, pipelineId);
-                    currPipelineMetrics.setWriteRowCount(
-                            currPipelineMetrics.getWriteRowCount() + 
node.get("value").asLong());
-                }
+    private LinkedHashMap<Integer, JobMetrics> extractMetrics(
+            LinkedHashMap<Integer, String> jobPipelineStatus, JsonNode 
jsonNode) {
+        LinkedHashMap<Integer, JobMetrics> metricsMap = new LinkedHashMap<>();
+        JsonNode sourceReceivedCount = jsonNode.get("SourceReceivedCount");
+        if (sourceReceivedCount != null && sourceReceivedCount.isArray()) {
+            for (JsonNode node : sourceReceivedCount) {
+                Integer pipelineId = 
node.get("tags").get("pipelineId").asInt();
+                JobMetrics currPipelineMetrics =
+                        getOrCreatePipelineMetricsMap(metricsMap, 
jobPipelineStatus, pipelineId);
+                currPipelineMetrics.setReadRowCount(
+                        currPipelineMetrics.getReadRowCount() + 
node.get("value").asLong());
             }
+        }
 
-            JsonNode sinkWriteQPS = jsonNode.get("SinkWriteQPS");
-            if (sinkWriteQPS != null && sinkWriteQPS.isArray()) {
-                for (JsonNode node : jsonNode.get("SinkWriteQPS")) {
-                    Integer pipelineId = 
node.get("tags").get("pipelineId").asInt();
-                    JobMetrics currPipelineMetrics =
-                            getOrCreatePipelineMetricsMap(
-                                    metricsMap, jobPipelineStatus, pipelineId);
-                    currPipelineMetrics.setWriteQps(
-                            currPipelineMetrics.getWriteQps()
-                                    + (new 
Double(node.get("value").asDouble())).longValue());
-                }
+        JsonNode sinkWriteCount = jsonNode.get("SinkWriteCount");
+        if (sinkWriteCount != null && sinkWriteCount.isArray()) {
+            for (JsonNode node : jsonNode.get("SinkWriteCount")) {
+                Integer pipelineId = 
node.get("tags").get("pipelineId").asInt();
+                JobMetrics currPipelineMetrics =
+                        getOrCreatePipelineMetricsMap(metricsMap, 
jobPipelineStatus, pipelineId);
+                currPipelineMetrics.setWriteRowCount(
+                        currPipelineMetrics.getWriteRowCount() + 
node.get("value").asLong());
             }
+        }
 
-            JsonNode sourceReceivedQPS = jsonNode.get("SourceReceivedQPS");
-            if (sourceReceivedQPS != null && sourceReceivedQPS.isArray()) {
-                for (JsonNode node : jsonNode.get("SourceReceivedQPS")) {
-                    Integer pipelineId = 
node.get("tags").get("pipelineId").asInt();
-                    JobMetrics currPipelineMetrics =
-                            getOrCreatePipelineMetricsMap(
-                                    metricsMap, jobPipelineStatus, pipelineId);
-                    currPipelineMetrics.setReadQps(
-                            currPipelineMetrics.getReadQps()
-                                    + (new 
Double(node.get("value").asDouble())).longValue());
-                }
+        JsonNode sinkWriteQPS = jsonNode.get("SinkWriteQPS");
+        if (sinkWriteQPS != null && sinkWriteQPS.isArray()) {
+            for (JsonNode node : jsonNode.get("SinkWriteQPS")) {
+                Integer pipelineId = 
node.get("tags").get("pipelineId").asInt();
+                JobMetrics currPipelineMetrics =
+                        getOrCreatePipelineMetricsMap(metricsMap, 
jobPipelineStatus, pipelineId);
+                currPipelineMetrics.setWriteQps(
+                        currPipelineMetrics.getWriteQps()
+                                + (new 
Double(node.get("value").asDouble())).longValue());
             }
+        }
 
-            JsonNode cdcRecordEmitDelay = jsonNode.get("CDCRecordEmitDelay");
-            if (cdcRecordEmitDelay != null && cdcRecordEmitDelay.isArray()) {
-                Map<Integer, List<Long>> dataMap = new HashMap<>();
-                for (JsonNode node : jsonNode.get("CDCRecordEmitDelay")) {
-                    Integer pipelineId = 
node.get("tags").get("pipelineId").asInt();
-                    long value = node.get("value").asLong();
-                    dataMap.computeIfAbsent(pipelineId, n -> new 
ArrayList<>()).add(value);
-                }
-                dataMap.forEach(
-                        (key, value) -> {
-                            JobMetrics currPipelineMetrics =
-                                    getOrCreatePipelineMetricsMap(
-                                            metricsMap, jobPipelineStatus, 
key);
-                            OptionalDouble average = 
value.stream().mapToDouble(a -> a).average();
-                            currPipelineMetrics.setRecordDelay(
-                                    Double.valueOf(average.isPresent() ? 
average.getAsDouble() : 0)
-                                            .longValue());
-                        });
+        JsonNode sourceReceivedQPS = jsonNode.get("SourceReceivedQPS");
+        if (sourceReceivedQPS != null && sourceReceivedQPS.isArray()) {
+            for (JsonNode node : jsonNode.get("SourceReceivedQPS")) {
+                Integer pipelineId = 
node.get("tags").get("pipelineId").asInt();
+                JobMetrics currPipelineMetrics =
+                        getOrCreatePipelineMetricsMap(metricsMap, 
jobPipelineStatus, pipelineId);
+                currPipelineMetrics.setReadQps(
+                        currPipelineMetrics.getReadQps()
+                                + (new 
Double(node.get("value").asDouble())).longValue());
             }
-        } catch (JsonProcessingException e) {
-            throw new SeatunnelException(
-                    SeatunnelErrorEnum.LOAD_ENGINE_METRICS_JSON_ERROR,
-                    "SeaTunnel",
-                    ExceptionUtils.getMessage(e));
         }
 
+        JsonNode cdcRecordEmitDelay = jsonNode.get("CDCRecordEmitDelay");
+        if (cdcRecordEmitDelay != null && cdcRecordEmitDelay.isArray()) {
+            Map<Integer, List<Long>> dataMap = new HashMap<>();
+            for (JsonNode node : jsonNode.get("CDCRecordEmitDelay")) {
+                Integer pipelineId = 
node.get("tags").get("pipelineId").asInt();
+                long value = node.get("value").asLong();
+                dataMap.computeIfAbsent(pipelineId, n -> new 
ArrayList<>()).add(value);
+            }
+            dataMap.forEach(
+                    (key, value) -> {
+                        JobMetrics currPipelineMetrics =
+                                getOrCreatePipelineMetricsMap(metricsMap, 
jobPipelineStatus, key);
+                        OptionalDouble average = value.stream().mapToDouble(a 
-> a).average();
+                        currPipelineMetrics.setRecordDelay(
+                                Double.valueOf(average.isPresent() ? 
average.getAsDouble() : 0)
+                                        .longValue());
+                    });
+        }
         return metricsMap;
     }
 
@@ -452,7 +380,7 @@ public class SeaTunnelEngineMetricsExtractor implements 
IEngineMetricsExtractor
             }
 
         } catch (Exception e) {
-            e.printStackTrace();
+            log.error("Failed to fetch running job metrics", e);
         }
         return allRunningJobMetricsHashMap;
     }
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobExecParamUtil.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java
similarity index 99%
rename from 
seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobExecParamUtil.java
rename to 
seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java
index 497524df..86defdd0 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobExecParamUtil.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java
@@ -25,7 +25,7 @@ import 
org.apache.seatunnel.app.domain.request.job.JobExecParam;
 import java.util.List;
 import java.util.Map;
 
-public class JobExecParamUtil {
+public class JobUtils {
 
     // The maximum length of the job execution error message, 4KB
     private static final int ERROR_MESSAGE_MAX_LENGTH = 4096;
diff --git 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java
 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java
index b898ebd9..2ff5ef73 100644
--- 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java
+++ 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java
@@ -28,7 +28,7 @@ import 
org.apache.seatunnel.app.domain.request.job.PluginConfig;
 import org.apache.seatunnel.app.domain.response.job.JobConfigRes;
 import org.apache.seatunnel.app.domain.response.job.JobRes;
 import 
org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
-import org.apache.seatunnel.app.utils.JobUtils;
+import org.apache.seatunnel.app.utils.JobTestingUtils;
 import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
 
 import org.junit.jupiter.api.AfterAll;
@@ -71,7 +71,7 @@ public class JobControllerTest {
         assertTrue(result.isSuccess());
         assertTrue(result.getData() > 0);
         Result<List<JobPipelineDetailMetricsRes>> listResult =
-                JobUtils.waitForJobCompletion(result.getData());
+                JobTestingUtils.waitForJobCompletion(result.getData());
         assertEquals(1, listResult.getData().size());
         assertEquals("FINISHED", listResult.getData().get(0).getStatus());
         assertEquals(5, listResult.getData().get(0).getReadRowCount());
diff --git 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
index 50a4e419..96adebe2 100644
--- 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
+++ 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
@@ -29,7 +29,7 @@ import 
org.apache.seatunnel.app.domain.request.job.PluginConfig;
 import org.apache.seatunnel.app.domain.response.executor.JobExecutionStatus;
 import org.apache.seatunnel.app.domain.response.executor.JobExecutorRes;
 import 
org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
-import org.apache.seatunnel.app.utils.JobUtils;
+import org.apache.seatunnel.app.utils.JobTestingUtils;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 
 import org.junit.jupiter.api.AfterAll;
@@ -66,12 +66,12 @@ public class JobExecutorControllerTest {
     @Test
     public void executeJob_shouldReturnSuccess_whenValidRequest() {
         String jobName = "execJob" + uniqueId;
-        long jobVersionId = JobUtils.createJob(jobName);
+        long jobVersionId = JobTestingUtils.createJob(jobName);
         Result<Long> result = 
jobExecutorControllerWrapper.jobExecutor(jobVersionId);
         assertTrue(result.isSuccess());
         assertTrue(result.getData() > 0);
         Result<List<JobPipelineDetailMetricsRes>> listResult =
-                JobUtils.waitForJobCompletion(result.getData());
+                JobTestingUtils.waitForJobCompletion(result.getData());
         assertEquals(1, listResult.getData().size());
         assertEquals("FINISHED", listResult.getData().get(0).getStatus());
         assertEquals(5, listResult.getData().get(0).getReadRowCount());
@@ -81,12 +81,12 @@ public class JobExecutorControllerTest {
     @Test
     public void executeJobWithParameters() {
         String jobName = "execJobWithParam" + uniqueId;
-        long jobVersionId = JobUtils.createJob(jobName);
+        long jobVersionId = JobTestingUtils.createJob(jobName);
         Result<Long> result = 
jobExecutorControllerWrapper.jobExecutor(jobVersionId);
         assertTrue(result.isSuccess());
         assertTrue(result.getData() > 0);
         Result<List<JobPipelineDetailMetricsRes>> listResult =
-                JobUtils.waitForJobCompletion(result.getData());
+                JobTestingUtils.waitForJobCompletion(result.getData());
         assertEquals(1, listResult.getData().size());
         assertEquals("FINISHED", listResult.getData().get(0).getStatus());
         assertEquals(5, listResult.getData().get(0).getReadRowCount());
@@ -120,7 +120,7 @@ public class JobExecutorControllerTest {
         result = jobExecutorControllerWrapper.jobExecutor(jobVersionId, 
jobExecParam);
         assertTrue(result.isSuccess());
         assertTrue(result.getData() > 0);
-        listResult = JobUtils.waitForJobCompletion(result.getData());
+        listResult = JobTestingUtils.waitForJobCompletion(result.getData());
         assertEquals(1, listResult.getData().size());
         assertEquals("FINISHED", listResult.getData().get(0).getStatus());
         assertEquals(numberOfRecords, 
listResult.getData().get(0).getReadRowCount());
@@ -138,7 +138,7 @@ public class JobExecutorControllerTest {
     @Test
     public void executeJobWithParameters_AllowQueryUpdate() {
         String jobName = "execJobUpdateQuery" + uniqueId;
-        JobCreateReq jobCreateReq = 
JobUtils.populateMySQLJobCreateReqFromFile();
+        JobCreateReq jobCreateReq = 
JobTestingUtils.populateMySQLJobCreateReqFromFile();
         jobCreateReq.getJobConfig().setName(jobName);
         jobCreateReq.getJobConfig().setDescription(jobName + " description");
         String datasourceName = "execJobUpdateQuery_db" + uniqueId;
@@ -184,7 +184,7 @@ public class JobExecutorControllerTest {
     @Test
     public void executeJobWithParameters_ChangeDatabase() {
         String jobName = "execJobChangeDatabase" + uniqueId;
-        JobCreateReq jobCreateReq = 
JobUtils.populateMySQLJobCreateReqFromFile();
+        JobCreateReq jobCreateReq = 
JobTestingUtils.populateMySQLJobCreateReqFromFile();
         jobCreateReq.getJobConfig().setName(jobName);
         jobCreateReq.getJobConfig().setDescription(jobName + " description");
         String datasourceName = "execJobChangeDatabase_db_1" + uniqueId;
@@ -241,7 +241,7 @@ public class JobExecutorControllerTest {
     @Test
     public void restoreJob_shouldReturnSuccess_whenValidRequest() {
         String jobName = "jobRestore" + uniqueId;
-        long jobVersionId = JobUtils.createJob(jobName);
+        long jobVersionId = JobTestingUtils.createJob(jobName);
         Result<Long> executorResult = 
jobExecutorControllerWrapper.jobExecutor(jobVersionId);
         assertTrue(executorResult.isSuccess());
         Result<Void> result = 
jobExecutorControllerWrapper.jobRestore(executorResult.getData());
@@ -251,7 +251,7 @@ public class JobExecutorControllerTest {
     @Test
     public void getResource_shouldReturnSuccess_whenValidRequest() {
         String jobName = "getResource" + uniqueId;
-        long jobVersionId = JobUtils.createJob(jobName);
+        long jobVersionId = JobTestingUtils.createJob(jobName);
         Result<JobExecutorRes> result = 
jobExecutorControllerWrapper.resource(jobVersionId);
         assertTrue(result.isSuccess());
         assertNotNull(result.getData());
@@ -260,7 +260,7 @@ public class JobExecutorControllerTest {
     @Test
     public void executeJob_JobStatusUpdate_WhenSubmissionFailed() {
         String jobName = "execJobStatus" + uniqueId;
-        JobCreateReq jobCreateReq = 
JobUtils.populateMySQLJobCreateReqFromFile();
+        JobCreateReq jobCreateReq = 
JobTestingUtils.populateMySQLJobCreateReqFromFile();
         jobCreateReq.getJobConfig().setName(jobName);
         jobCreateReq.getJobConfig().setDescription(jobName + " description");
         String datasourceName = "execJobStatus_db_1" + uniqueId;
@@ -294,13 +294,13 @@ public class JobExecutorControllerTest {
     @Test
     public void storeErrorMessageWhenJobFailed() throws InterruptedException {
         String jobName = "failureCause" + uniqueId;
-        long jobVersionId = JobUtils.createJob(jobName, true);
+        long jobVersionId = JobTestingUtils.createJob(jobName, true);
         Result<Long> result = 
jobExecutorControllerWrapper.jobExecutor(jobVersionId);
         // job submitted successfully but it will fail during execution
         assertTrue(result.isSuccess());
         assertTrue(result.getData() > 0);
         Long jobInstanceId = result.getData();
-        JobUtils.waitForJobCompletion(jobInstanceId);
+        JobTestingUtils.waitForJobCompletion(jobInstanceId);
         // extra second to let the data get updated in the database
         Thread.sleep(2000);
         Result<SeaTunnelJobInstanceDto> jobExecutionDetailResult =
diff --git 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobMetricsControllerTest.java
 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobMetricsControllerTest.java
index 5086ba74..060056df 100644
--- 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobMetricsControllerTest.java
+++ 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobMetricsControllerTest.java
@@ -22,7 +22,7 @@ import 
org.apache.seatunnel.app.controller.JobExecutorControllerWrapper;
 import org.apache.seatunnel.app.controller.JobMetricsControllerWrapper;
 import org.apache.seatunnel.app.domain.response.metrics.JobDAG;
 import 
org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
-import org.apache.seatunnel.app.utils.JobUtils;
+import org.apache.seatunnel.app.utils.JobTestingUtils;
 
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
@@ -58,7 +58,7 @@ public class JobMetricsControllerTest {
     }
 
     private static Long executeJob(String jobName) {
-        Long jobVersionId = JobUtils.createJob(jobName);
+        Long jobVersionId = JobTestingUtils.createJob(jobName);
         Result<Long> jobExecutionResult = 
jobExecutorControllerWrapper.jobExecutor(jobVersionId);
         assertTrue(jobExecutionResult.isSuccess());
         return jobExecutionResult.getData();
diff --git 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/TaskInstanceControllerTest.java
 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/TaskInstanceControllerTest.java
index 08d4aacd..c227fc7d 100644
--- 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/TaskInstanceControllerTest.java
+++ 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/TaskInstanceControllerTest.java
@@ -24,7 +24,7 @@ import 
org.apache.seatunnel.app.controller.JobExecutorControllerWrapper;
 import org.apache.seatunnel.app.controller.TaskInstanceControllerWrapper;
 import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
 import 
org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
-import org.apache.seatunnel.app.utils.JobUtils;
+import org.apache.seatunnel.app.utils.JobTestingUtils;
 
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
@@ -53,11 +53,11 @@ public class TaskInstanceControllerTest extends 
SeatunnelWebTestingBase {
     @Test
     public void getTaskInstanceList_shouldReturnData_whenValidRequest() {
         String jobName = "getTaskInstance" + uniqueId;
-        long jobVersionId = JobUtils.createJob(jobName);
+        long jobVersionId = JobTestingUtils.createJob(jobName);
         Result<Long> execuitonResult = 
jobExecutorControllerWrapper.jobExecutor(jobVersionId);
         assertTrue(execuitonResult.isSuccess());
         Result<List<JobPipelineDetailMetricsRes>> listResult =
-                JobUtils.waitForJobCompletion(execuitonResult.getData());
+                
JobTestingUtils.waitForJobCompletion(execuitonResult.getData());
         assertEquals(1, listResult.getData().size());
         assertEquals("FINISHED", listResult.getData().get(0).getStatus());
 
diff --git 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobUtils.java 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java
similarity index 99%
rename from 
seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobUtils.java
rename to 
seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java
index 2cce6317..e0e2b8d5 100644
--- 
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobUtils.java
+++ 
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java
@@ -38,7 +38,7 @@ import java.util.concurrent.TimeUnit;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class JobUtils {
+public class JobTestingUtils {
     private static JobMetricsControllerWrapper jobMetricsControllerWrapper =
             new JobMetricsControllerWrapper();
     private static JobConfigControllerWrapper jobConfigControllerWrapper =


Reply via email to