This is an automated email from the ASF dual-hosted git repository. fanjia pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git
The following commit(s) were added to refs/heads/main by this push: new 7140ad24 [Improvement] [Seatunnel-web] Improve code in DatasourceServiceImpl,SeaTunnelEngineMetricsExtractor etc. (#199) 7140ad24 is described below commit 7140ad24b8c9a30c3cad7897eb77e84ca99f538a Author: Mohammad Arshad <ars...@apache.org> AuthorDate: Fri Aug 30 23:19:45 2024 +0530 [Improvement] [Seatunnel-web] Improve code in DatasourceServiceImpl,SeaTunnelEngineMetricsExtractor etc. (#199) --- .../app/service/impl/DatasourceServiceImpl.java | 33 +--- .../app/service/impl/JobExecutorServiceImpl.java | 6 +- .../app/service/impl/JobInstanceServiceImpl.java | 30 ++- .../engine/SeaTunnelEngineMetricsExtractor.java | 214 +++++++-------------- .../utils/{JobExecParamUtil.java => JobUtils.java} | 2 +- .../seatunnel/app/test/JobControllerTest.java | 4 +- .../app/test/JobExecutorControllerTest.java | 26 +-- .../app/test/JobMetricsControllerTest.java | 4 +- .../app/test/TaskInstanceControllerTest.java | 6 +- .../utils/{JobUtils.java => JobTestingUtils.java} | 2 +- 10 files changed, 114 insertions(+), 213 deletions(-) diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/DatasourceServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/DatasourceServiceImpl.java index 4e290fc0..403e0420 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/DatasourceServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/DatasourceServiceImpl.java @@ -570,17 +570,7 @@ public class DatasourceServiceImpl extends SeatunnelBaseServiceImpl datasourceList.forEach( datasource -> { - DatasourceDetailRes datasourceDetailRes = new DatasourceDetailRes(); - datasourceDetailRes.setId(datasource.getId().toString()); - datasourceDetailRes.setDatasourceName(datasource.getDatasourceName()); - datasourceDetailRes.setPluginName(datasource.getPluginName()); - datasourceDetailRes.setPluginVersion(datasource.getPluginVersion()); - datasourceDetailRes.setDescription(datasource.getDescription()); - datasourceDetailRes.setCreateTime(datasource.getCreateTime()); - datasourceDetailRes.setUpdateTime(datasource.getUpdateTime()); - Map<String, String> config = JsonUtils.toMap(datasource.getDatasourceConfig()); - datasourceDetailRes.setDatasourceConfig(config); - datasourceDetailResList.add(datasourceDetailRes); + datasourceDetailResList.add(getDatasourceDetailRes(datasource)); }); return datasourceDetailResList; } @@ -599,6 +589,10 @@ public class DatasourceServiceImpl extends SeatunnelBaseServiceImpl if (null == datasource) { throw new SeatunnelException(SeatunnelErrorEnum.DATASOURCE_NOT_FOUND, datasourceName); } + return getDatasourceDetailRes(datasource); + } + + private static DatasourceDetailRes getDatasourceDetailRes(Datasource datasource) { DatasourceDetailRes datasourceDetailRes = new DatasourceDetailRes(); datasourceDetailRes.setId(datasource.getId().toString()); datasourceDetailRes.setDatasourceName(datasource.getDatasourceName()); @@ -612,7 +606,6 @@ public class DatasourceServiceImpl extends SeatunnelBaseServiceImpl JsonUtils.toMap(datasource.getDatasourceConfig(), String.class, String.class); // convert option rule datasourceDetailRes.setDatasourceConfig(datasourceConfig); - return datasourceDetailRes; } @@ -624,21 +617,7 @@ public class DatasourceServiceImpl extends SeatunnelBaseServiceImpl if (null == datasource) { throw new SeatunnelException(SeatunnelErrorEnum.DATASOURCE_NOT_FOUND, datasourceId); } - DatasourceDetailRes datasourceDetailRes = new DatasourceDetailRes(); - datasourceDetailRes.setId(datasource.getId().toString()); - datasourceDetailRes.setDatasourceName(datasource.getDatasourceName()); - datasourceDetailRes.setPluginName(datasource.getPluginName()); - datasourceDetailRes.setPluginVersion(datasource.getPluginVersion()); - datasourceDetailRes.setDescription(datasource.getDescription()); - datasourceDetailRes.setCreateTime(datasource.getCreateTime()); - datasourceDetailRes.setUpdateTime(datasource.getUpdateTime()); - - Map<String, String> datasourceConfig = - JsonUtils.toMap(datasource.getDatasourceConfig(), String.class, String.class); - // convert option rule - datasourceDetailRes.setDatasourceConfig(datasourceConfig); - - return datasourceDetailRes; + return getDatasourceDetailRes(datasource); } @Override diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java index e7490347..15250caa 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java @@ -28,7 +28,7 @@ import org.apache.seatunnel.app.service.IJobInstanceService; import org.apache.seatunnel.app.thirdparty.engine.SeaTunnelEngineProxy; import org.apache.seatunnel.app.thirdparty.metrics.EngineMetricsExtractorFactory; import org.apache.seatunnel.app.thirdparty.metrics.IEngineMetricsExtractor; -import org.apache.seatunnel.app.utils.JobExecParamUtil; +import org.apache.seatunnel.app.utils.JobUtils; import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.config.DeployMode; import org.apache.seatunnel.engine.client.SeaTunnelClient; @@ -130,8 +130,7 @@ public class JobExecutorServiceImpl implements IJobExecutorService { JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId); jobInstance.setJobStatus(JobStatus.FAILED.name()); jobInstance.setEndTime(new Date()); - String jobInstanceErrorMessage = - JobExecParamUtil.getJobInstanceErrorMessage(e.getMessage()); + String jobInstanceErrorMessage = JobUtils.getJobInstanceErrorMessage(e.getMessage()); jobInstance.setErrorMessage(jobInstanceErrorMessage); jobInstanceDao.update(jobInstance); throw new RuntimeException(e.getMessage(), e); @@ -178,7 +177,6 @@ public class JobExecutorServiceImpl implements IJobExecutorService { private SeaTunnelClient createSeaTunnelClient() { ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); - clientConfig.setClusterName(clientConfig.getClusterName()); return new SeaTunnelClient(clientConfig); } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java index 68f7ee00..8dff7bff 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java @@ -50,6 +50,7 @@ import org.apache.seatunnel.app.domain.request.job.SelectTableFields; import org.apache.seatunnel.app.domain.request.job.TableSchemaReq; import org.apache.seatunnel.app.domain.request.job.transform.Transform; import org.apache.seatunnel.app.domain.request.job.transform.TransformOptions; +import org.apache.seatunnel.app.domain.response.datasource.DatasourceDetailRes; import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes; import org.apache.seatunnel.app.domain.response.executor.JobExecutorRes; import org.apache.seatunnel.app.permission.constants.SeatunnelFuncPermissionKeyConstant; @@ -59,7 +60,7 @@ import org.apache.seatunnel.app.service.IJobMetricsService; import org.apache.seatunnel.app.service.IVirtualTableService; import org.apache.seatunnel.app.thirdparty.datasource.DataSourceConfigSwitcherUtils; import org.apache.seatunnel.app.thirdparty.transfrom.TransformConfigSwitcherUtils; -import org.apache.seatunnel.app.utils.JobExecParamUtil; +import org.apache.seatunnel.app.utils.JobUtils; import org.apache.seatunnel.app.utils.SeaTunnelConfigUtil; import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.common.utils.ExceptionUtils; @@ -173,8 +174,8 @@ public class JobInstanceServiceImpl extends SeatunnelBaseServiceImpl BusinessMode businessMode = BusinessMode.valueOf(jobDefinitionDao.getJob(jobId).getJobType()); Config envConfig = filterEmptyValue(ConfigFactory.parseString(envStr)); - envConfig = JobExecParamUtil.updateEnvConfig(executeParam, envConfig); - JobExecParamUtil.updateDataSource(executeParam, tasks); + envConfig = JobUtils.updateEnvConfig(executeParam, envConfig); + JobUtils.updateDataSource(executeParam, tasks); Map<String, List<Config>> sourceMap = new LinkedHashMap<>(); Map<String, List<Config>> transformMap = new LinkedHashMap<>(); @@ -230,8 +231,7 @@ public class JobInstanceServiceImpl extends SeatunnelBaseServiceImpl } config = - JobExecParamUtil.updateTaskConfig( - executeParam, config, task.getName()); + JobUtils.updateTaskConfig(executeParam, config, task.getName()); Config mergeConfig = mergeTaskConfig( task, @@ -241,7 +241,7 @@ public class JobInstanceServiceImpl extends SeatunnelBaseServiceImpl config, optionRule); mergeConfig = - JobExecParamUtil.updateQueryTaskConfig( + JobUtils.updateQueryTaskConfig( executeParam, mergeConfig, task.getName()); sourceMap .get(task.getConnectorType()) @@ -273,7 +273,7 @@ public class JobInstanceServiceImpl extends SeatunnelBaseServiceImpl List<TableSchemaReq> inputSchemas = findInputSchemas(tasks, lines, task); Config transformConfig = buildTransformConfig(task, config, inputSchemas); transformConfig = - JobExecParamUtil.updateTaskConfig( + JobUtils.updateTaskConfig( executeParam, transformConfig, task.getName()); transformMap .get(task.getConnectorType()) @@ -290,8 +290,7 @@ public class JobInstanceServiceImpl extends SeatunnelBaseServiceImpl sinkMap.put(task.getConnectorType(), new ArrayList<>()); } config = - JobExecParamUtil.updateTaskConfig( - executeParam, config, task.getName()); + JobUtils.updateTaskConfig(executeParam, config, task.getName()); Config mergeConfig = mergeTaskConfig( task, @@ -370,8 +369,7 @@ public class JobInstanceServiceImpl extends SeatunnelBaseServiceImpl jobInstance.setJobStatus(jobResult.getStatus().name()); jobInstance.setJobEngineId(jobEngineId); jobInstance.setUpdateUserId(userId); - jobInstance.setErrorMessage( - JobExecParamUtil.getJobInstanceErrorMessage(jobResult.getError())); + jobInstance.setErrorMessage(JobUtils.getJobInstanceErrorMessage(jobResult.getError())); jobInstanceDao.update(jobInstance); } @@ -436,16 +434,14 @@ public class JobInstanceServiceImpl extends SeatunnelBaseServiceImpl throws JsonProcessingException { Long datasourceInstanceId = task.getDataSourceId(); - String pluginName = - datasourceService - .queryDatasourceDetailById(datasourceInstanceId.toString()) - .getPluginName(); + DatasourceDetailRes datasourceDetailRes = + datasourceService.queryDatasourceDetailById(datasourceInstanceId.toString()); + String pluginName = datasourceDetailRes.getPluginName(); Config datasourceConf = parseConfigWithOptionRule( pluginType, connectorType, - datasourceService.queryDatasourceConfigById( - datasourceInstanceId.toString()), + datasourceDetailRes.getDatasourceConfig(), optionRule); DataSourceOption dataSourceOption = null; diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineMetricsExtractor.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineMetricsExtractor.java index c8426293..0a7f35fd 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineMetricsExtractor.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineMetricsExtractor.java @@ -92,7 +92,6 @@ public class SeaTunnelEngineMetricsExtractor implements IEngineMetricsExtractor @Override public List<JobMetrics> getMetricsByJobEngineId(@NonNull String jobEngineId) { - LinkedHashMap<Integer, JobMetrics> metricsMap = new LinkedHashMap<>(); LinkedHashMap<Integer, String> jobPipelineStatus = getJobPipelineStatus(jobEngineId); try { @@ -103,83 +102,15 @@ public class SeaTunnelEngineMetricsExtractor implements IEngineMetricsExtractor JsonNode jsonNode = JsonUtils.stringToJsonNode(seaTunnelEngineProxy.getMetricsContent(jobEngineId)); - JsonNode sourceReceivedCount = jsonNode.get("SourceReceivedCount"); - if (sourceReceivedCount != null && sourceReceivedCount.isArray()) { - for (JsonNode node : sourceReceivedCount) { - Integer pipelineId = node.get("tags").get("pipelineId").asInt(); - JobMetrics currPipelineMetrics = - getOrCreatePipelineMetricsMap( - metricsMap, jobPipelineStatus, pipelineId); - currPipelineMetrics.setReadRowCount( - currPipelineMetrics.getReadRowCount() + node.get("value").asLong()); - } - } - - JsonNode sinkWriteCount = jsonNode.get("SinkWriteCount"); - if (sinkWriteCount != null && sinkWriteCount.isArray()) { - for (JsonNode node : jsonNode.get("SinkWriteCount")) { - Integer pipelineId = node.get("tags").get("pipelineId").asInt(); - JobMetrics currPipelineMetrics = - getOrCreatePipelineMetricsMap( - metricsMap, jobPipelineStatus, pipelineId); - currPipelineMetrics.setWriteRowCount( - currPipelineMetrics.getWriteRowCount() + node.get("value").asLong()); - } - } - - JsonNode sinkWriteQPS = jsonNode.get("SinkWriteQPS"); - if (sinkWriteQPS != null && sinkWriteQPS.isArray()) { - for (JsonNode node : jsonNode.get("SinkWriteQPS")) { - Integer pipelineId = node.get("tags").get("pipelineId").asInt(); - JobMetrics currPipelineMetrics = - getOrCreatePipelineMetricsMap( - metricsMap, jobPipelineStatus, pipelineId); - currPipelineMetrics.setWriteQps( - currPipelineMetrics.getWriteQps() - + (new Double(node.get("value").asDouble())).longValue()); - } - } - - JsonNode sourceReceivedQPS = jsonNode.get("SourceReceivedQPS"); - if (sourceReceivedQPS != null && sourceReceivedQPS.isArray()) { - for (JsonNode node : jsonNode.get("SourceReceivedQPS")) { - Integer pipelineId = node.get("tags").get("pipelineId").asInt(); - JobMetrics currPipelineMetrics = - getOrCreatePipelineMetricsMap( - metricsMap, jobPipelineStatus, pipelineId); - currPipelineMetrics.setReadQps( - currPipelineMetrics.getReadQps() - + (new Double(node.get("value").asDouble())).longValue()); - } - } - - JsonNode cdcRecordEmitDelay = jsonNode.get("CDCRecordEmitDelay"); - if (cdcRecordEmitDelay != null && cdcRecordEmitDelay.isArray()) { - Map<Integer, List<Long>> dataMap = new HashMap<>(); - for (JsonNode node : jsonNode.get("CDCRecordEmitDelay")) { - Integer pipelineId = node.get("tags").get("pipelineId").asInt(); - long value = node.get("value").asLong(); - dataMap.computeIfAbsent(pipelineId, n -> new ArrayList<>()).add(value); - } - dataMap.forEach( - (key, value) -> { - JobMetrics currPipelineMetrics = - getOrCreatePipelineMetricsMap( - metricsMap, jobPipelineStatus, key); - OptionalDouble average = value.stream().mapToDouble(a -> a).average(); - currPipelineMetrics.setRecordDelay( - Double.valueOf(average.isPresent() ? average.getAsDouble() : 0) - .longValue()); - }); - } + LinkedHashMap<Integer, JobMetrics> metricsMap = + extractMetrics(jobPipelineStatus, jsonNode); + return Arrays.asList(metricsMap.values().toArray(new JobMetrics[0])); } catch (JsonProcessingException e) { throw new SeatunnelException( SeatunnelErrorEnum.LOAD_ENGINE_METRICS_JSON_ERROR, "SeaTunnel", ExceptionUtils.getMessage(e)); } - - return Arrays.asList(metricsMap.values().toArray(new JobMetrics[0])); } @Override @@ -267,8 +198,6 @@ public class SeaTunnelEngineMetricsExtractor implements IEngineMetricsExtractor @Override public Map<Integer, JobMetrics> getMetricsByJobEngineIdRTMap(@NonNull String jobEngineId) { - LinkedHashMap<Integer, JobMetrics> metricsMap = new LinkedHashMap<>(); - LinkedHashMap<Integer, String> jobPipelineStatus = getJobPipelineStatus(jobEngineId); try { String metricsContent = seaTunnelEngineProxy.getMetricsContent(jobEngineId); @@ -276,84 +205,83 @@ public class SeaTunnelEngineMetricsExtractor implements IEngineMetricsExtractor return new HashMap<>(); } - JsonNode jsonNode = - JsonUtils.stringToJsonNode(seaTunnelEngineProxy.getMetricsContent(jobEngineId)); - JsonNode sourceReceivedCount = jsonNode.get("SourceReceivedCount"); - if (sourceReceivedCount != null && sourceReceivedCount.isArray()) { - for (JsonNode node : sourceReceivedCount) { - Integer pipelineId = node.get("tags").get("pipelineId").asInt(); - JobMetrics currPipelineMetrics = - getOrCreatePipelineMetricsMap( - metricsMap, jobPipelineStatus, pipelineId); - currPipelineMetrics.setReadRowCount( - currPipelineMetrics.getReadRowCount() + node.get("value").asLong()); - } - } + JsonNode jsonNode = JsonUtils.stringToJsonNode(metricsContent); + return extractMetrics(jobPipelineStatus, jsonNode); + } catch (JsonProcessingException e) { + throw new SeatunnelException( + SeatunnelErrorEnum.LOAD_ENGINE_METRICS_JSON_ERROR, + "SeaTunnel", + ExceptionUtils.getMessage(e)); + } + } - JsonNode sinkWriteCount = jsonNode.get("SinkWriteCount"); - if (sinkWriteCount != null && sinkWriteCount.isArray()) { - for (JsonNode node : jsonNode.get("SinkWriteCount")) { - Integer pipelineId = node.get("tags").get("pipelineId").asInt(); - JobMetrics currPipelineMetrics = - getOrCreatePipelineMetricsMap( - metricsMap, jobPipelineStatus, pipelineId); - currPipelineMetrics.setWriteRowCount( - currPipelineMetrics.getWriteRowCount() + node.get("value").asLong()); - } + private LinkedHashMap<Integer, JobMetrics> extractMetrics( + LinkedHashMap<Integer, String> jobPipelineStatus, JsonNode jsonNode) { + LinkedHashMap<Integer, JobMetrics> metricsMap = new LinkedHashMap<>(); + JsonNode sourceReceivedCount = jsonNode.get("SourceReceivedCount"); + if (sourceReceivedCount != null && sourceReceivedCount.isArray()) { + for (JsonNode node : sourceReceivedCount) { + Integer pipelineId = node.get("tags").get("pipelineId").asInt(); + JobMetrics currPipelineMetrics = + getOrCreatePipelineMetricsMap(metricsMap, jobPipelineStatus, pipelineId); + currPipelineMetrics.setReadRowCount( + currPipelineMetrics.getReadRowCount() + node.get("value").asLong()); } + } - JsonNode sinkWriteQPS = jsonNode.get("SinkWriteQPS"); - if (sinkWriteQPS != null && sinkWriteQPS.isArray()) { - for (JsonNode node : jsonNode.get("SinkWriteQPS")) { - Integer pipelineId = node.get("tags").get("pipelineId").asInt(); - JobMetrics currPipelineMetrics = - getOrCreatePipelineMetricsMap( - metricsMap, jobPipelineStatus, pipelineId); - currPipelineMetrics.setWriteQps( - currPipelineMetrics.getWriteQps() - + (new Double(node.get("value").asDouble())).longValue()); - } + JsonNode sinkWriteCount = jsonNode.get("SinkWriteCount"); + if (sinkWriteCount != null && sinkWriteCount.isArray()) { + for (JsonNode node : jsonNode.get("SinkWriteCount")) { + Integer pipelineId = node.get("tags").get("pipelineId").asInt(); + JobMetrics currPipelineMetrics = + getOrCreatePipelineMetricsMap(metricsMap, jobPipelineStatus, pipelineId); + currPipelineMetrics.setWriteRowCount( + currPipelineMetrics.getWriteRowCount() + node.get("value").asLong()); } + } - JsonNode sourceReceivedQPS = jsonNode.get("SourceReceivedQPS"); - if (sourceReceivedQPS != null && sourceReceivedQPS.isArray()) { - for (JsonNode node : jsonNode.get("SourceReceivedQPS")) { - Integer pipelineId = node.get("tags").get("pipelineId").asInt(); - JobMetrics currPipelineMetrics = - getOrCreatePipelineMetricsMap( - metricsMap, jobPipelineStatus, pipelineId); - currPipelineMetrics.setReadQps( - currPipelineMetrics.getReadQps() - + (new Double(node.get("value").asDouble())).longValue()); - } + JsonNode sinkWriteQPS = jsonNode.get("SinkWriteQPS"); + if (sinkWriteQPS != null && sinkWriteQPS.isArray()) { + for (JsonNode node : jsonNode.get("SinkWriteQPS")) { + Integer pipelineId = node.get("tags").get("pipelineId").asInt(); + JobMetrics currPipelineMetrics = + getOrCreatePipelineMetricsMap(metricsMap, jobPipelineStatus, pipelineId); + currPipelineMetrics.setWriteQps( + currPipelineMetrics.getWriteQps() + + (new Double(node.get("value").asDouble())).longValue()); } + } - JsonNode cdcRecordEmitDelay = jsonNode.get("CDCRecordEmitDelay"); - if (cdcRecordEmitDelay != null && cdcRecordEmitDelay.isArray()) { - Map<Integer, List<Long>> dataMap = new HashMap<>(); - for (JsonNode node : jsonNode.get("CDCRecordEmitDelay")) { - Integer pipelineId = node.get("tags").get("pipelineId").asInt(); - long value = node.get("value").asLong(); - dataMap.computeIfAbsent(pipelineId, n -> new ArrayList<>()).add(value); - } - dataMap.forEach( - (key, value) -> { - JobMetrics currPipelineMetrics = - getOrCreatePipelineMetricsMap( - metricsMap, jobPipelineStatus, key); - OptionalDouble average = value.stream().mapToDouble(a -> a).average(); - currPipelineMetrics.setRecordDelay( - Double.valueOf(average.isPresent() ? average.getAsDouble() : 0) - .longValue()); - }); + JsonNode sourceReceivedQPS = jsonNode.get("SourceReceivedQPS"); + if (sourceReceivedQPS != null && sourceReceivedQPS.isArray()) { + for (JsonNode node : jsonNode.get("SourceReceivedQPS")) { + Integer pipelineId = node.get("tags").get("pipelineId").asInt(); + JobMetrics currPipelineMetrics = + getOrCreatePipelineMetricsMap(metricsMap, jobPipelineStatus, pipelineId); + currPipelineMetrics.setReadQps( + currPipelineMetrics.getReadQps() + + (new Double(node.get("value").asDouble())).longValue()); } - } catch (JsonProcessingException e) { - throw new SeatunnelException( - SeatunnelErrorEnum.LOAD_ENGINE_METRICS_JSON_ERROR, - "SeaTunnel", - ExceptionUtils.getMessage(e)); } + JsonNode cdcRecordEmitDelay = jsonNode.get("CDCRecordEmitDelay"); + if (cdcRecordEmitDelay != null && cdcRecordEmitDelay.isArray()) { + Map<Integer, List<Long>> dataMap = new HashMap<>(); + for (JsonNode node : jsonNode.get("CDCRecordEmitDelay")) { + Integer pipelineId = node.get("tags").get("pipelineId").asInt(); + long value = node.get("value").asLong(); + dataMap.computeIfAbsent(pipelineId, n -> new ArrayList<>()).add(value); + } + dataMap.forEach( + (key, value) -> { + JobMetrics currPipelineMetrics = + getOrCreatePipelineMetricsMap(metricsMap, jobPipelineStatus, key); + OptionalDouble average = value.stream().mapToDouble(a -> a).average(); + currPipelineMetrics.setRecordDelay( + Double.valueOf(average.isPresent() ? average.getAsDouble() : 0) + .longValue()); + }); + } return metricsMap; } @@ -452,7 +380,7 @@ public class SeaTunnelEngineMetricsExtractor implements IEngineMetricsExtractor } } catch (Exception e) { - e.printStackTrace(); + log.error("Failed to fetch running job metrics", e); } return allRunningJobMetricsHashMap; } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobExecParamUtil.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java similarity index 99% rename from seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobExecParamUtil.java rename to seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java index 497524df..86defdd0 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobExecParamUtil.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobUtils.java @@ -25,7 +25,7 @@ import org.apache.seatunnel.app.domain.request.job.JobExecParam; import java.util.List; import java.util.Map; -public class JobExecParamUtil { +public class JobUtils { // The maximum length of the job execution error message, 4KB private static final int ERROR_MESSAGE_MAX_LENGTH = 4096; diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java index b898ebd9..2ff5ef73 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobControllerTest.java @@ -28,7 +28,7 @@ import org.apache.seatunnel.app.domain.request.job.PluginConfig; import org.apache.seatunnel.app.domain.response.job.JobConfigRes; import org.apache.seatunnel.app.domain.response.job.JobRes; import org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes; -import org.apache.seatunnel.app.utils.JobUtils; +import org.apache.seatunnel.app.utils.JobTestingUtils; import org.apache.seatunnel.server.common.SeatunnelErrorEnum; import org.junit.jupiter.api.AfterAll; @@ -71,7 +71,7 @@ public class JobControllerTest { assertTrue(result.isSuccess()); assertTrue(result.getData() > 0); Result<List<JobPipelineDetailMetricsRes>> listResult = - JobUtils.waitForJobCompletion(result.getData()); + JobTestingUtils.waitForJobCompletion(result.getData()); assertEquals(1, listResult.getData().size()); assertEquals("FINISHED", listResult.getData().get(0).getStatus()); assertEquals(5, listResult.getData().get(0).getReadRowCount()); diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java index 50a4e419..96adebe2 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java @@ -29,7 +29,7 @@ import org.apache.seatunnel.app.domain.request.job.PluginConfig; import org.apache.seatunnel.app.domain.response.executor.JobExecutionStatus; import org.apache.seatunnel.app.domain.response.executor.JobExecutorRes; import org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes; -import org.apache.seatunnel.app.utils.JobUtils; +import org.apache.seatunnel.app.utils.JobTestingUtils; import org.apache.seatunnel.engine.core.job.JobStatus; import org.junit.jupiter.api.AfterAll; @@ -66,12 +66,12 @@ public class JobExecutorControllerTest { @Test public void executeJob_shouldReturnSuccess_whenValidRequest() { String jobName = "execJob" + uniqueId; - long jobVersionId = JobUtils.createJob(jobName); + long jobVersionId = JobTestingUtils.createJob(jobName); Result<Long> result = jobExecutorControllerWrapper.jobExecutor(jobVersionId); assertTrue(result.isSuccess()); assertTrue(result.getData() > 0); Result<List<JobPipelineDetailMetricsRes>> listResult = - JobUtils.waitForJobCompletion(result.getData()); + JobTestingUtils.waitForJobCompletion(result.getData()); assertEquals(1, listResult.getData().size()); assertEquals("FINISHED", listResult.getData().get(0).getStatus()); assertEquals(5, listResult.getData().get(0).getReadRowCount()); @@ -81,12 +81,12 @@ public class JobExecutorControllerTest { @Test public void executeJobWithParameters() { String jobName = "execJobWithParam" + uniqueId; - long jobVersionId = JobUtils.createJob(jobName); + long jobVersionId = JobTestingUtils.createJob(jobName); Result<Long> result = jobExecutorControllerWrapper.jobExecutor(jobVersionId); assertTrue(result.isSuccess()); assertTrue(result.getData() > 0); Result<List<JobPipelineDetailMetricsRes>> listResult = - JobUtils.waitForJobCompletion(result.getData()); + JobTestingUtils.waitForJobCompletion(result.getData()); assertEquals(1, listResult.getData().size()); assertEquals("FINISHED", listResult.getData().get(0).getStatus()); assertEquals(5, listResult.getData().get(0).getReadRowCount()); @@ -120,7 +120,7 @@ public class JobExecutorControllerTest { result = jobExecutorControllerWrapper.jobExecutor(jobVersionId, jobExecParam); assertTrue(result.isSuccess()); assertTrue(result.getData() > 0); - listResult = JobUtils.waitForJobCompletion(result.getData()); + listResult = JobTestingUtils.waitForJobCompletion(result.getData()); assertEquals(1, listResult.getData().size()); assertEquals("FINISHED", listResult.getData().get(0).getStatus()); assertEquals(numberOfRecords, listResult.getData().get(0).getReadRowCount()); @@ -138,7 +138,7 @@ public class JobExecutorControllerTest { @Test public void executeJobWithParameters_AllowQueryUpdate() { String jobName = "execJobUpdateQuery" + uniqueId; - JobCreateReq jobCreateReq = JobUtils.populateMySQLJobCreateReqFromFile(); + JobCreateReq jobCreateReq = JobTestingUtils.populateMySQLJobCreateReqFromFile(); jobCreateReq.getJobConfig().setName(jobName); jobCreateReq.getJobConfig().setDescription(jobName + " description"); String datasourceName = "execJobUpdateQuery_db" + uniqueId; @@ -184,7 +184,7 @@ public class JobExecutorControllerTest { @Test public void executeJobWithParameters_ChangeDatabase() { String jobName = "execJobChangeDatabase" + uniqueId; - JobCreateReq jobCreateReq = JobUtils.populateMySQLJobCreateReqFromFile(); + JobCreateReq jobCreateReq = JobTestingUtils.populateMySQLJobCreateReqFromFile(); jobCreateReq.getJobConfig().setName(jobName); jobCreateReq.getJobConfig().setDescription(jobName + " description"); String datasourceName = "execJobChangeDatabase_db_1" + uniqueId; @@ -241,7 +241,7 @@ public class JobExecutorControllerTest { @Test public void restoreJob_shouldReturnSuccess_whenValidRequest() { String jobName = "jobRestore" + uniqueId; - long jobVersionId = JobUtils.createJob(jobName); + long jobVersionId = JobTestingUtils.createJob(jobName); Result<Long> executorResult = jobExecutorControllerWrapper.jobExecutor(jobVersionId); assertTrue(executorResult.isSuccess()); Result<Void> result = jobExecutorControllerWrapper.jobRestore(executorResult.getData()); @@ -251,7 +251,7 @@ public class JobExecutorControllerTest { @Test public void getResource_shouldReturnSuccess_whenValidRequest() { String jobName = "getResource" + uniqueId; - long jobVersionId = JobUtils.createJob(jobName); + long jobVersionId = JobTestingUtils.createJob(jobName); Result<JobExecutorRes> result = jobExecutorControllerWrapper.resource(jobVersionId); assertTrue(result.isSuccess()); assertNotNull(result.getData()); @@ -260,7 +260,7 @@ public class JobExecutorControllerTest { @Test public void executeJob_JobStatusUpdate_WhenSubmissionFailed() { String jobName = "execJobStatus" + uniqueId; - JobCreateReq jobCreateReq = JobUtils.populateMySQLJobCreateReqFromFile(); + JobCreateReq jobCreateReq = JobTestingUtils.populateMySQLJobCreateReqFromFile(); jobCreateReq.getJobConfig().setName(jobName); jobCreateReq.getJobConfig().setDescription(jobName + " description"); String datasourceName = "execJobStatus_db_1" + uniqueId; @@ -294,13 +294,13 @@ public class JobExecutorControllerTest { @Test public void storeErrorMessageWhenJobFailed() throws InterruptedException { String jobName = "failureCause" + uniqueId; - long jobVersionId = JobUtils.createJob(jobName, true); + long jobVersionId = JobTestingUtils.createJob(jobName, true); Result<Long> result = jobExecutorControllerWrapper.jobExecutor(jobVersionId); // job submitted successfully but it will fail during execution assertTrue(result.isSuccess()); assertTrue(result.getData() > 0); Long jobInstanceId = result.getData(); - JobUtils.waitForJobCompletion(jobInstanceId); + JobTestingUtils.waitForJobCompletion(jobInstanceId); // extra second to let the data get updated in the database Thread.sleep(2000); Result<SeaTunnelJobInstanceDto> jobExecutionDetailResult = diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobMetricsControllerTest.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobMetricsControllerTest.java index 5086ba74..060056df 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobMetricsControllerTest.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobMetricsControllerTest.java @@ -22,7 +22,7 @@ import org.apache.seatunnel.app.controller.JobExecutorControllerWrapper; import org.apache.seatunnel.app.controller.JobMetricsControllerWrapper; import org.apache.seatunnel.app.domain.response.metrics.JobDAG; import org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes; -import org.apache.seatunnel.app.utils.JobUtils; +import org.apache.seatunnel.app.utils.JobTestingUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -58,7 +58,7 @@ public class JobMetricsControllerTest { } private static Long executeJob(String jobName) { - Long jobVersionId = JobUtils.createJob(jobName); + Long jobVersionId = JobTestingUtils.createJob(jobName); Result<Long> jobExecutionResult = jobExecutorControllerWrapper.jobExecutor(jobVersionId); assertTrue(jobExecutionResult.isSuccess()); return jobExecutionResult.getData(); diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/TaskInstanceControllerTest.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/TaskInstanceControllerTest.java index 08d4aacd..c227fc7d 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/TaskInstanceControllerTest.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/TaskInstanceControllerTest.java @@ -24,7 +24,7 @@ import org.apache.seatunnel.app.controller.JobExecutorControllerWrapper; import org.apache.seatunnel.app.controller.TaskInstanceControllerWrapper; import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto; import org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes; -import org.apache.seatunnel.app.utils.JobUtils; +import org.apache.seatunnel.app.utils.JobTestingUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -53,11 +53,11 @@ public class TaskInstanceControllerTest extends SeatunnelWebTestingBase { @Test public void getTaskInstanceList_shouldReturnData_whenValidRequest() { String jobName = "getTaskInstance" + uniqueId; - long jobVersionId = JobUtils.createJob(jobName); + long jobVersionId = JobTestingUtils.createJob(jobName); Result<Long> execuitonResult = jobExecutorControllerWrapper.jobExecutor(jobVersionId); assertTrue(execuitonResult.isSuccess()); Result<List<JobPipelineDetailMetricsRes>> listResult = - JobUtils.waitForJobCompletion(execuitonResult.getData()); + JobTestingUtils.waitForJobCompletion(execuitonResult.getData()); assertEquals(1, listResult.getData().size()); assertEquals("FINISHED", listResult.getData().get(0).getStatus()); diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobUtils.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java similarity index 99% rename from seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobUtils.java rename to seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java index 2cce6317..e0e2b8d5 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobUtils.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobTestingUtils.java @@ -38,7 +38,7 @@ import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertTrue; -public class JobUtils { +public class JobTestingUtils { private static JobMetricsControllerWrapper jobMetricsControllerWrapper = new JobMetricsControllerWrapper(); private static JobConfigControllerWrapper jobConfigControllerWrapper =