This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git
The following commit(s) were added to refs/heads/main by this push:
new a5074a1d [Feature] Add task monitor chart (#281)
a5074a1d is described below
commit a5074a1de8e68b48247c7aae41426a735510cdb3
Author: Jast <[email protected]>
AuthorDate: Wed May 7 14:23:38 2025 +0800
[Feature] Add task monitor chart (#281)
---
.../apache/seatunnel/app/config/MonitorConfig.java | 33 ++
.../app/controller/JobMetricsController.java | 16 +
.../seatunnel/app/dal/dao/IJobInstanceDao.java | 2 +
.../app/dal/dao/IJobMetricsHistoryDao.java | 40 +++
.../app/dal/dao/impl/JobInstanceDaoImpl.java | 5 +
.../app/dal/dao/impl/JobMetricsHistoryDaoImpl.java | 65 ++++
.../app/dal/entity/JobMetricsHistory.java | 89 +++++
.../app/dal/mapper/JobInstanceMapper.java | 5 +
.../app/dal/mapper/JobMetricsHistoryMapper.java | 49 +++
.../app/scheduler/MonitorTaskScheduler.java | 208 ++++++++++++
.../seatunnel/app/service/IJobMetricsService.java | 15 +
.../app/service/impl/JobMetricsServiceImpl.java | 33 ++
.../seatunnel/app/dal/mapper/JobInstanceMapper.xml | 5 +
.../app/dal/mapper/JobMetricsHistoryMapper.xml | 97 ++++++
.../resources/script/seatunnel_server_mysql.sql | 29 ++
seatunnel-ui/src/locales/en_US/project.ts | 10 +-
seatunnel-ui/src/locales/zh_CN/project.ts | 24 +-
.../src/service/sync-task-instance/index.ts | 9 +
seatunnel-ui/src/utils/timePickeroption.ts | 33 ++
.../task/synchronization-instance/detail/index.tsx | 15 +-
.../detail/task-metrics.module.scss | 21 ++
.../detail/task-metrics.tsx | 154 +++++++++
.../detail/use-task-metrics.ts | 366 +++++++++++++++++++++
23 files changed, 1314 insertions(+), 9 deletions(-)
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/config/MonitorConfig.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/config/MonitorConfig.java
new file mode 100644
index 00000000..a062df44
--- /dev/null
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/config/MonitorConfig.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.config;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.web.client.RestTemplate;
+
+@Configuration
+@EnableScheduling
+public class MonitorConfig {
+
+ @Bean
+ public RestTemplate restTemplate() {
+ return new RestTemplate();
+ }
+}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobMetricsController.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobMetricsController.java
index 673f93aa..67e7044d 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobMetricsController.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobMetricsController.java
@@ -18,10 +18,13 @@
package org.apache.seatunnel.app.controller;
import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.dal.entity.JobMetricsHistory;
import org.apache.seatunnel.app.domain.response.metrics.JobDAG;
import
org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
import
org.apache.seatunnel.app.domain.response.metrics.JobPipelineSummaryMetricsRes;
import org.apache.seatunnel.app.service.IJobMetricsService;
+import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
+import org.apache.seatunnel.server.common.SeatunnelException;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
@@ -68,4 +71,17 @@ public class JobMetricsController {
throws IOException {
return
Result.success(jobMetricsService.getJobPipelineSummaryMetrics(jobInstanceId));
}
+
+ @GetMapping("/history")
+ public Result<List<JobMetricsHistory>> getJobMetricsHistory(
+ @RequestParam("jobInstanceId") Long jobInstanceId,
+ @RequestParam(value = "startTime", required = false) String
startTime,
+ @RequestParam(value = "endTime", required = false) String endTime)
{
+ if (jobInstanceId == null) {
+ throw new SeatunnelException(
+ SeatunnelErrorEnum.UNKNOWN, "jobInstanceId cannot be
null");
+ }
+ return Result.success(
+ jobMetricsService.getJobMetricsHistory(jobInstanceId,
startTime, endTime));
+ }
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobInstanceDao.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobInstanceDao.java
index ec161933..6a67cbbc 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobInstanceDao.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobInstanceDao.java
@@ -49,6 +49,8 @@ public interface IJobInstanceDao {
List<JobInstance> getAllJobInstance(@NonNull List<Long> jobInstanceIdList);
+ List<JobInstance> getAllRunningJobInstance();
+
JobInstance getJobExecutionStatus(@NonNull Long jobInstanceId);
void deleteById(@NonNull Long jobInstanceId);
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobMetricsHistoryDao.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobMetricsHistoryDao.java
new file mode 100644
index 00000000..a00445c2
--- /dev/null
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobMetricsHistoryDao.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.dal.dao;
+
+import org.apache.seatunnel.app.dal.entity.JobMetricsHistory;
+
+import java.util.List;
+
+public interface IJobMetricsHistoryDao {
+ /** Insert a monitoring record */
+ void insert(JobMetricsHistory jobMetricsHistory);
+
+ /** Batch insertion of monitoring records */
+ void insertBatch(List<JobMetricsHistory> jobMetricsHistories);
+
+ /** Batch insertion of monitoring records */
+ List<JobMetricsHistory> getByJobInstanceId(Long jobInstanceId);
+
+ /** Get monitoring history records according to job instance ID and
pipeline ID. */
+ List<JobMetricsHistory> getByJobInstanceIdAndPipelineId(Long
jobInstanceId, Integer pipelineId);
+
+ /** Query monitoring history based on homework instance ID and time range
*/
+ List<JobMetricsHistory> getByJobInstanceIdAndTimeRange(
+ Long jobInstanceId, String startTime, String endTime);
+}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobInstanceDaoImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobInstanceDaoImpl.java
index 5e9d0496..996b5551 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobInstanceDaoImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobInstanceDaoImpl.java
@@ -87,6 +87,11 @@ public class JobInstanceDaoImpl implements IJobInstanceDao {
page, startTime, endTime, jobDefineName, jobMode,
getWorkspaceId());
}
+ @Override
+ public List<JobInstance> getAllRunningJobInstance() {
+ return jobInstanceMapper.getAllRunningJobInstance();
+ }
+
@Override
public List<JobInstance> getAllJobInstance(@NonNull List<Long>
jobInstanceIdList) {
return jobInstanceMapper.selectList(
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobMetricsHistoryDaoImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobMetricsHistoryDaoImpl.java
new file mode 100644
index 00000000..756317da
--- /dev/null
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobMetricsHistoryDaoImpl.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.dal.dao.impl;
+
+import org.apache.seatunnel.app.dal.dao.IJobMetricsHistoryDao;
+import org.apache.seatunnel.app.dal.entity.JobMetricsHistory;
+import org.apache.seatunnel.app.dal.mapper.JobMetricsHistoryMapper;
+
+import org.springframework.stereotype.Repository;
+
+import javax.annotation.Resource;
+
+import java.util.List;
+
+@Repository
+public class JobMetricsHistoryDaoImpl implements IJobMetricsHistoryDao {
+
+ @Resource private JobMetricsHistoryMapper jobMetricsHistoryMapper;
+
+ @Override
+ public void insert(JobMetricsHistory jobMetricsHistory) {
+ jobMetricsHistoryMapper.insert(jobMetricsHistory);
+ }
+
+ @Override
+ public void insertBatch(List<JobMetricsHistory> jobMetricsHistories) {
+ if (!jobMetricsHistories.isEmpty()) {
+ jobMetricsHistoryMapper.insertBatchMetrics(jobMetricsHistories);
+ }
+ }
+
+ @Override
+ public List<JobMetricsHistory> getByJobInstanceId(Long jobInstanceId) {
+ return
jobMetricsHistoryMapper.queryJobMetricsHistoryByInstanceId(jobInstanceId);
+ }
+
+ @Override
+ public List<JobMetricsHistory> getByJobInstanceIdAndPipelineId(
+ Long jobInstanceId, Integer pipelineId) {
+ return
jobMetricsHistoryMapper.queryJobMetricsHistoryByInstanceIdAndPipelineId(
+ jobInstanceId, pipelineId);
+ }
+
+ @Override
+ public List<JobMetricsHistory> getByJobInstanceIdAndTimeRange(
+ Long jobInstanceId, String startTime, String endTime) {
+ return
jobMetricsHistoryMapper.queryJobMetricsHistoryByInstanceIdAndTimeRange(
+ jobInstanceId, startTime, endTime);
+ }
+}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobMetricsHistory.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobMetricsHistory.java
new file mode 100644
index 00000000..abeb565c
--- /dev/null
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobMetricsHistory.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.dal.entity;
+
+import org.apache.seatunnel.engine.core.job.JobStatus;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.LocalDateTime;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+@TableName("t_st_job_metrics_history")
+/**
+ * We will expand the JobMetricsHistory metric in the future, so we did not
use the JobMetrics table
+ */
+public class JobMetricsHistory {
+
+ @TableId(value = "id", type = IdType.INPUT)
+ private Long id;
+
+ @TableField("job_instance_id")
+ private Long jobInstanceId;
+
+ @TableField("pipeline_id")
+ private Integer pipelineId;
+
+ @TableField("read_row_count")
+ private long readRowCount;
+
+ @TableField("write_row_count")
+ private long writeRowCount;
+
+ @TableField("source_table_names")
+ private String sourceTableNames;
+
+ @TableField("sink_table_names")
+ private String sinkTableNames;
+
+ @TableField("read_qps")
+ private long readQps;
+
+ @TableField("write_qps")
+ private long writeQps;
+
+ @TableField("record_delay")
+ private long recordDelay;
+
+ @TableField("status")
+ private JobStatus status;
+
+ @TableField("create_user_id")
+ private Integer createUserId;
+
+ @TableField("update_user_id")
+ private Integer updateUserId;
+
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "UTC")
+ @TableField("create_time")
+ private LocalDateTime createTime;
+
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "UTC")
+ @TableField("update_time")
+ private LocalDateTime updateTime;
+}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.java
index fb013a46..1270c524 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.java
@@ -29,6 +29,7 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import java.util.Date;
+import java.util.List;
@Mapper
public interface JobInstanceMapper extends BaseMapper<JobInstance> {
@@ -39,4 +40,8 @@ public interface JobInstanceMapper extends
BaseMapper<JobInstance> {
@Param("jobDefineName") String jobDefineName,
@Param("jobMode") JobMode jobMode,
@Param("workspaceId") Long workspaceId);
+
+ JobInstance getJobExecutionStatus(@Param("jobInstanceId") Long
jobInstanceId);
+
+ List<JobInstance> getAllRunningJobInstance();
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/mapper/JobMetricsHistoryMapper.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/mapper/JobMetricsHistoryMapper.java
new file mode 100644
index 00000000..d3af85eb
--- /dev/null
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/mapper/JobMetricsHistoryMapper.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.dal.mapper;
+
+import org.apache.seatunnel.app.dal.entity.JobMetricsHistory;
+
+import org.apache.ibatis.annotations.Mapper;
+import org.apache.ibatis.annotations.Param;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
+import java.util.List;
+
+@Mapper
+public interface JobMetricsHistoryMapper extends BaseMapper<JobMetricsHistory>
{
+
+ /** Batch insertion of monitoring history records */
+ void insertBatchMetrics(
+ @Param("jobMetricsHistories") List<JobMetricsHistory>
jobMetricsHistories);
+
+ /** Query monitoring history based on homework instance ID */
+ List<JobMetricsHistory> queryJobMetricsHistoryByInstanceId(
+ @Param("jobInstanceId") Long jobInstanceId);
+
+ /** Query monitoring history based on homework instance ID and pipeline ID
*/
+ List<JobMetricsHistory> queryJobMetricsHistoryByInstanceIdAndPipelineId(
+ @Param("jobInstanceId") Long jobInstanceId, @Param("pipelineId")
Integer pipelineId);
+
+ /** Query monitoring history based on homework instance ID and time range
*/
+ List<JobMetricsHistory> queryJobMetricsHistoryByInstanceIdAndTimeRange(
+ @Param("jobInstanceId") Long jobInstanceId,
+ @Param("startTime") String startTime,
+ @Param("endTime") String endTime);
+}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/scheduler/MonitorTaskScheduler.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/scheduler/MonitorTaskScheduler.java
new file mode 100644
index 00000000..de30281f
--- /dev/null
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/scheduler/MonitorTaskScheduler.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.scheduler;
+
+import
org.apache.seatunnel.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.seatunnel.app.dal.dao.IJobInstanceDao;
+import org.apache.seatunnel.app.dal.dao.IJobMetricsHistoryDao;
+import org.apache.seatunnel.app.dal.entity.JobInstance;
+import org.apache.seatunnel.app.dal.entity.JobMetricsHistory;
+import
org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
+import org.apache.seatunnel.app.service.IJobMetricsService;
+import org.apache.seatunnel.engine.core.job.JobStatus;
+
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import lombok.extern.slf4j.Slf4j;
+
+import javax.annotation.PreDestroy;
+import javax.annotation.Resource;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@Slf4j
+@Component
+public class MonitorTaskScheduler {
+
+ private final ExecutorService executorService;
+
+ @Resource private IJobInstanceDao jobInstanceDao;
+
+ @Resource private IJobMetricsService jobMetricsService;
+
+ @Resource private IJobMetricsHistoryDao jobMetricsHistoryDao;
+
+ private final ConcurrentHashMap<Long, JobInstance> jobInstanceMap = new
ConcurrentHashMap<>();
+
+ private final Object mapLock = new Object();
+
+ public MonitorTaskScheduler() {
+ // Create thread pool
+ this.executorService =
+ new ThreadPoolExecutor(
+ 5,
+ 10,
+ 60L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(100),
+ new ThreadFactoryBuilder()
+ .setNameFormat("task-processor-%d")
+ .setUncaughtExceptionHandler(
+ (t, e) ->
+ log.error(
+ "Thread {} encountered
uncaught exception",
+ t.getName(),
+ e))
+ .build(),
+ new ThreadPoolExecutor.CallerRunsPolicy());
+ }
+
+ @Scheduled(initialDelay = 0, fixedRate = 60000)
+ public void updateJobInstance() {
+ try {
+ log.info("Start updating job instance information...");
+ List<JobInstance> allJobInstance =
jobInstanceDao.getAllRunningJobInstance();
+
+ Map<Long, JobInstance> newInstanceMap =
+ allJobInstance.stream()
+ .collect(
+ Collectors.toMap(
+ JobInstance::getId,
+ instance -> instance,
+ (existing, replacement) ->
replacement));
+
+ synchronized (mapLock) {
+ jobInstanceMap.clear();
+ jobInstanceMap.putAll(newInstanceMap);
+ }
+
+ log.debug(
+ "Job instance information updated, current total
instances: {}",
+ jobInstanceMap.size());
+ } catch (Exception e) {
+ log.error("Error updating job instance information", e);
+ }
+ }
+
+ public JobInstance getJobInstance(Long jobInstanceId) {
+ synchronized (mapLock) {
+ return jobInstanceMap.get(jobInstanceId);
+ }
+ }
+
+ public List<JobInstance> getAllJobInstances() {
+ synchronized (mapLock) {
+ return new ArrayList<>(jobInstanceMap.values());
+ }
+ }
+
+ @Scheduled(fixedDelay = 5000)
+ public void scheduleTasks() {
+ List<JobInstance> instances;
+ synchronized (mapLock) {
+ instances = new ArrayList<>(jobInstanceMap.values());
+ }
+
+ instances.forEach(
+ jobInstance -> {
+ if (jobInstance.getJobStatus() != JobStatus.RUNNING) {
+ return;
+ }
+ try {
+ executorService.submit(
+ () -> {
+ try {
+ Long jobInstanceId =
jobInstance.getId();
+ List<JobPipelineDetailMetricsRes>
metricsResList =
+
jobMetricsService.getJobPipelineDetailMetricsRes(
+ jobInstance);
+
+ if (metricsResList != null &&
!metricsResList.isEmpty()) {
+ List<JobMetricsHistory>
historyList =
+ metricsResList.stream()
+ .map(
+ metrics ->
+
convertToJobMetricsHistory(
+
metrics,
+
jobInstanceId))
+
.collect(Collectors.toList());
+
+
jobMetricsHistoryDao.insertBatch(historyList);
+ log.debug(
+ "Successfully saved
metrics for job {}, total {} records",
+ jobInstanceId,
+ historyList.size());
+ }
+ } catch (Exception e) {
+ log.error("Error saving job metrics",
e);
+ }
+ });
+ } catch (Exception e) {
+ log.error("Task scheduling error", e);
+ }
+ });
+ }
+
+ private JobMetricsHistory convertToJobMetricsHistory(
+ JobPipelineDetailMetricsRes metrics, Long jobInstanceId) {
+ return JobMetricsHistory.builder()
+ .id(generateId())
+ .jobInstanceId(jobInstanceId)
+ .pipelineId(metrics.getPipelineId())
+ .readRowCount(metrics.getReadRowCount())
+ .writeRowCount(metrics.getWriteRowCount())
+ .sourceTableNames(metrics.getSourceTableNames())
+ .sinkTableNames(metrics.getSinkTableNames())
+ .readQps(metrics.getReadQps())
+ .writeQps(metrics.getWriteQps())
+ .recordDelay(metrics.getRecordDelay())
+ .status(metrics.getStatus())
+ .createUserId(-1)
+ .updateUserId(-1)
+ .build();
+ }
+
+ private Long generateId() {
+ // Here you can use a distributed ID generator, such as Snowflake
algorithm
+ return System.currentTimeMillis();
+ }
+
+ @PreDestroy
+ public void shutdown() {
+ log.info("Shutting down task scheduler...");
+ executorService.shutdown();
+ try {
+ if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
+ executorService.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ executorService.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ }
+}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobMetricsService.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobMetricsService.java
index 4fbaca40..20324af0 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobMetricsService.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobMetricsService.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.app.service;
import org.apache.seatunnel.app.dal.entity.JobInstance;
+import org.apache.seatunnel.app.dal.entity.JobMetricsHistory;
import org.apache.seatunnel.app.domain.response.metrics.JobDAG;
import
org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
import
org.apache.seatunnel.app.domain.response.metrics.JobPipelineSummaryMetricsRes;
@@ -40,6 +41,9 @@ public interface IJobMetricsService {
JobDAG getJobDAG(@NonNull Long jobInstanceId) throws
JsonProcessingException;
+ public List<JobPipelineDetailMetricsRes> getJobPipelineDetailMetricsRes(
+ @NonNull JobInstance jobInstance);
+
ImmutablePair<Long, String> getInstanceIdAndEngineId(@NonNull String key);
void syncJobDataToDb(@NonNull JobInstance jobInstance, @NonNull String
jobEngineId);
@@ -51,4 +55,15 @@ public interface IJobMetricsService {
@NonNull Map<Long, Long> jobInstanceIdAndJobEngineIdMap,
@NonNull List<Long> jobInstanceIdList,
@NonNull JobMode jobMode);
+
+ /**
+ * Get job metrics history data
+ *
+ * @param jobInstanceId job instance id
+ * @return List of metrics data points
+ */
+ List<JobMetricsHistory> getJobMetricsHistory(@NonNull Long jobInstanceId);
+
+ List<JobMetricsHistory> getJobMetricsHistory(
+ Long jobInstanceId, String startTime, String endTime);
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java
index 0d9bad0f..9637b277 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java
@@ -23,6 +23,8 @@ import org.apache.seatunnel.app.dal.dao.IJobMetricsDao;
import org.apache.seatunnel.app.dal.entity.JobInstance;
import org.apache.seatunnel.app.dal.entity.JobInstanceHistory;
import org.apache.seatunnel.app.dal.entity.JobMetrics;
+import org.apache.seatunnel.app.dal.entity.JobMetricsHistory;
+import org.apache.seatunnel.app.dal.mapper.JobMetricsHistoryMapper;
import org.apache.seatunnel.app.domain.response.engine.Engine;
import org.apache.seatunnel.app.domain.response.metrics.JobDAG;
import
org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
@@ -47,6 +49,7 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Service;
@@ -72,6 +75,8 @@ public class JobMetricsServiceImpl extends
SeatunnelBaseServiceImpl implements I
@Resource private IJobInstanceDao jobInstanceDao;
+ @Autowired private JobMetricsHistoryMapper jobMetricsHistoryMapper;
+
@Override
public List<JobPipelineSummaryMetricsRes> getJobPipelineSummaryMetrics(
@NonNull Long jobInstanceId) {
@@ -474,6 +479,19 @@ public class JobMetricsServiceImpl extends
SeatunnelBaseServiceImpl implements I
.collect(Collectors.toList());
}
+ @Override
+ public List<JobPipelineDetailMetricsRes> getJobPipelineDetailMetricsRes(
+ @NonNull JobInstance jobInstance) {
+ if (JobUtils.isJobEndStatus(jobInstance.getJobStatus())) {
+ return new ArrayList<>();
+ }
+ List<JobMetrics> jobPipelineDetailMetrics =
+ getJobMetricsFromEngine(jobInstance,
jobInstance.getJobEngineId());
+ return jobPipelineDetailMetrics.stream()
+ .map(this::wrapperJobMetrics)
+ .collect(Collectors.toList());
+ }
+
@Override
public JobDAG getJobDAG(@NonNull Long jobInstanceId) {
int userId = ServletUtils.getCurrentUserId();
@@ -683,4 +701,19 @@ public class JobMetricsServiceImpl extends
SeatunnelBaseServiceImpl implements I
jobMetricsDao.getJobMetricsMapper().insertBatchMetrics(list);
}
}
+
+ @Override
+ @NonNull public List<JobMetricsHistory> getJobMetricsHistory(@NonNull Long
jobInstanceId) {
+ return
jobMetricsHistoryMapper.queryJobMetricsHistoryByInstanceId(jobInstanceId);
+ }
+
+ @Override
+ public List<JobMetricsHistory> getJobMetricsHistory(
+ Long jobInstanceId, String startTime, String endTime) {
+ if (StringUtils.isNotEmpty(startTime) &&
StringUtils.isNotEmpty(endTime)) {
+ return
jobMetricsHistoryMapper.queryJobMetricsHistoryByInstanceIdAndTimeRange(
+ jobInstanceId, startTime, endTime);
+ }
+ return getJobMetricsHistory(jobInstanceId);
+ }
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.xml
b/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.xml
index 0978573d..f7b81555 100644
---
a/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.xml
+++
b/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.xml
@@ -57,4 +57,9 @@
</where>
ORDER BY ji.create_time DESC
</select>
+ <select id="getAllRunningJobInstance"
resultType="org.apache.seatunnel.app.dal.entity.JobInstance">
+ SELECT <include refid="Base_Column_List"/>
+ FROM t_st_job_instance t
+ WHERE t.job_status = 'RUNNING'
+ </select>
</mapper>
diff --git
a/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobMetricsHistoryMapper.xml
b/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobMetricsHistoryMapper.xml
new file mode 100644
index 00000000..1cda1ada
--- /dev/null
+++
b/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobMetricsHistoryMapper.xml
@@ -0,0 +1,97 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper
namespace="org.apache.seatunnel.app.dal.mapper.JobMetricsHistoryMapper">
+ <resultMap id="BaseResultMap"
type="org.apache.seatunnel.app.dal.entity.JobMetricsHistory">
+ <id column="id" jdbcType="BIGINT" property="id"/>
+ <result column="job_instance_id" jdbcType="BIGINT"
property="jobInstanceId"/>
+ <result column="pipeline_id" jdbcType="INTEGER" property="pipelineId"/>
+ <result column="read_row_count" jdbcType="BIGINT"
property="readRowCount"/>
+ <result column="write_row_count" jdbcType="BIGINT"
property="writeRowCount"/>
+ <result column="source_table_names" jdbcType="VARCHAR"
property="sourceTableNames"/>
+ <result column="sink_table_names" jdbcType="VARCHAR"
property="sinkTableNames"/>
+ <result column="read_qps" jdbcType="BIGINT" property="readQps"/>
+ <result column="write_qps" jdbcType="BIGINT" property="writeQps"/>
+ <result column="record_delay" jdbcType="BIGINT"
property="recordDelay"/>
+ <result column="status" jdbcType="VARCHAR" property="status"/>
+ <result column="create_user_id" jdbcType="INTEGER"
property="createUserId"/>
+ <result column="update_user_id" jdbcType="INTEGER"
property="updateUserId"/>
+ <result column="create_time" jdbcType="TIMESTAMP"
property="createTime"/>
+ <result column="update_time" jdbcType="TIMESTAMP"
property="updateTime"/>
+ </resultMap>
+
+ <sql id="Base_Column_List">
+ id, job_instance_id, pipeline_id, read_row_count, write_row_count,
+ source_table_names, sink_table_names, read_qps, write_qps,
record_delay,
+ status, create_user_id, update_user_id, create_time, update_time
+ </sql>
+
+ <insert id="insertBatchMetrics">
+ insert into t_st_job_metrics_history (
+ <include refid="Base_Column_List"/>
+ )
+ values
+ <foreach collection="jobMetricsHistories" item="metrics" separator=",">
+ (
+ #{metrics.id},
+ #{metrics.jobInstanceId},
+ #{metrics.pipelineId},
+ #{metrics.readRowCount},
+ #{metrics.writeRowCount},
+ #{metrics.sourceTableNames},
+ #{metrics.sinkTableNames},
+ #{metrics.readQps},
+ #{metrics.writeQps},
+ #{metrics.recordDelay},
+ #{metrics.status},
+ #{metrics.createUserId},
+ #{metrics.updateUserId},
+ #{metrics.createTime},
+ #{metrics.updateTime}
+ )
+ </foreach>
+ </insert>
+
+ <select id="queryJobMetricsHistoryByInstanceId" resultMap="BaseResultMap">
+ select
+ <include refid="Base_Column_List"/>
+ from t_st_job_metrics_history
+ where job_instance_id = #{jobInstanceId}
+ order by create_time asc
+ </select>
+
+ <select id="queryJobMetricsHistoryByInstanceIdAndPipelineId"
resultMap="BaseResultMap">
+ select
+ <include refid="Base_Column_List"/>
+ from t_st_job_metrics_history
+ where job_instance_id = #{jobInstanceId}
+ and pipeline_id = #{pipelineId}
+ order by create_time desc
+ </select>
+
+ <select id="queryJobMetricsHistoryByInstanceIdAndTimeRange"
resultType="org.apache.seatunnel.app.dal.entity.JobMetricsHistory">
+ SELECT *
+ FROM t_st_job_metrics_history
+ WHERE job_instance_id = #{jobInstanceId}
+ <if test="startTime != null">
+ AND create_time >= #{startTime}
+ </if>
+ <if test="endTime != null">
+ AND create_time <= #{endTime}
+ </if>
+ ORDER BY create_time ASC
+ </select>
+</mapper>
\ No newline at end of file
diff --git
a/seatunnel-server/seatunnel-app/src/main/resources/script/seatunnel_server_mysql.sql
b/seatunnel-server/seatunnel-app/src/main/resources/script/seatunnel_server_mysql.sql
index 5dd91e4c..e3660fba 100644
---
a/seatunnel-server/seatunnel-app/src/main/resources/script/seatunnel_server_mysql.sql
+++
b/seatunnel-server/seatunnel-app/src/main/resources/script/seatunnel_server_mysql.sql
@@ -284,3 +284,32 @@ CREATE TABLE `workspace` (
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE =
utf8mb4_general_ci ROW_FORMAT = Dynamic;
INSERT INTO `seatunnel`.`workspace`(`workspace_name`,`description`) values
('default', 'default workspace');
+
+
+DROP TABLE IF EXISTS `t_st_job_metrics_history`;
+-- auto-generated definition
+create table t_st_job_metrics_history
+(
+ id bigint not null
+ primary key,
+ job_instance_id bigint not null,
+ pipeline_id int(20) not null,
+ read_row_count bigint not null,
+ write_row_count bigint not null,
+ source_table_names varchar(200) null,
+ sink_table_names varchar(200) null,
+ read_qps bigint null,
+ write_qps bigint null,
+ record_delay bigint null,
+ status varchar(20) null,
+ create_user_id int(20) not null,
+ update_user_id int(20) null,
+ create_time timestamp(3) default CURRENT_TIMESTAMP(3) not null,
+ update_time timestamp(3) default CURRENT_TIMESTAMP(3) not null on
update CURRENT_TIMESTAMP(3)
+)
+ collate = utf8mb4_bin
+ row_format = DYNAMIC;
+
+create index idx_job_instance_id_create_time
+ on t_st_job_metrics_history (job_instance_id, create_time);
+
diff --git a/seatunnel-ui/src/locales/en_US/project.ts
b/seatunnel-ui/src/locales/en_US/project.ts
index b6bc46a6..d137c9e4 100644
--- a/seatunnel-ui/src/locales/en_US/project.ts
+++ b/seatunnel-ui/src/locales/en_US/project.ts
@@ -293,6 +293,13 @@ export default {
'The following workflow do not meet the execution conditions'
},
task: {
+ metrics: {
+ read_row_count: 'Read Data Trend',
+ write_row_count: 'Write Data Trend',
+ read_qps: 'Read QPS Trend',
+ write_qps: 'Write QPS Trend',
+ record_delay: 'Data Delay Trend(s)'
+ },
cancel_full_screen: 'Cancel full screen',
enter_full_screen: 'Enter full screen',
current_task_settings: 'Current task settings',
@@ -1146,7 +1153,8 @@ export default {
cancel: 'Cancel',
delete: 'Delete',
delete_confirm: 'Delete?',
- error_message: 'Error'
+ error_message: 'Error',
+ task_metrics: 'Task Monitor'
},
menu: {
fav: 'Favorites',
diff --git a/seatunnel-ui/src/locales/zh_CN/project.ts
b/seatunnel-ui/src/locales/zh_CN/project.ts
index 3e059f18..c69e6f3d 100644
--- a/seatunnel-ui/src/locales/zh_CN/project.ts
+++ b/seatunnel-ui/src/locales/zh_CN/project.ts
@@ -290,6 +290,13 @@ export default {
dependent_chain_condition_title: '以下工作流不满足执行条件'
},
task: {
+ metrics: {
+ read_row_count: '读取数据量趋势',
+ write_row_count: '写入数据量趋势',
+ read_qps: '读取QPS趋势',
+ write_qps: '写入QPS趋势',
+ record_delay: '数据延迟趋势(秒)'
+ },
cancel_full_screen: '取消全屏',
enter_full_screen: '全屏',
current_task_settings: '当前任务设置',
@@ -1113,7 +1120,8 @@ export default {
confirm: '确定',
cancel: '取消',
delete: '删除',
- delete_confirm: '确定删除吗?'
+ delete_confirm: '确定删除吗?',
+ task_metrics: '任务监控'
},
menu: {
fav: '收藏组件',
@@ -1147,5 +1155,17 @@ export default {
all_project: '全部项目',
next_step: '下一步',
pre_step: '上一步',
- project_name: '项目名称'
+ project_name: '项目名称',
+ metrics: {
+ last_1_minute: '最近1分钟',
+ last_10_minutes: '最近10分钟',
+ last_1_hour: '最近1小时',
+ last_3_hours: '最近3小时',
+ last_1_day: '最近1天',
+ last_7_days: '最近7天',
+ custom_time: '自定义时间',
+ start_time: '开始时间',
+ end_time: '结束时间',
+ metrics_title: '任务指标'
+ }
}
diff --git a/seatunnel-ui/src/service/sync-task-instance/index.ts
b/seatunnel-ui/src/service/sync-task-instance/index.ts
index c6c98a01..b39b05ac 100644
--- a/seatunnel-ui/src/service/sync-task-instance/index.ts
+++ b/seatunnel-ui/src/service/sync-task-instance/index.ts
@@ -89,3 +89,12 @@ export function hanldleDelJob(id: number): any {
method: 'delete'
})
}
+
+export function queryJobMetricsHistory(params: { jobInstanceId: string |
number }): any {
+ return axios({
+ url: '/job/metrics/history',
+ method: 'get',
+ timeout: 60000,
+ params
+ })
+}
diff --git a/seatunnel-ui/src/utils/timePickeroption.ts
b/seatunnel-ui/src/utils/timePickeroption.ts
index a78c9418..cae5d3de 100644
--- a/seatunnel-ui/src/utils/timePickeroption.ts
+++ b/seatunnel-ui/src/utils/timePickeroption.ts
@@ -15,6 +15,8 @@
* limitations under the License.
*/
+import { format, subDays, subHours, subMinutes } from 'date-fns'
+
export const getNowDate = (): any => {
return [
new Date(new Date().toLocaleDateString()).getTime(),
@@ -54,3 +56,34 @@ export function getRangeShortCuts(t: any) {
})
return rangeShortCuts
}
+
+export const getMetricsRangeShortcuts = (t: any) => {
+ const now = new Date()
+
+ return [
+ {
+ label: t('project.metrics.last_1_minute'),
+ value: () => [subMinutes(now, 1).getTime(), now.getTime()]
+ },
+ {
+ label: t('project.metrics.last_10_minutes'),
+ value: () => [subMinutes(now, 10).getTime(), now.getTime()]
+ },
+ {
+ label: t('project.metrics.last_1_hour'),
+ value: () => [subHours(now, 1).getTime(), now.getTime()]
+ },
+ {
+ label: t('project.metrics.last_3_hours'),
+ value: () => [subHours(now, 3).getTime(), now.getTime()]
+ },
+ {
+ label: t('project.metrics.last_1_day'),
+ value: () => [subDays(now, 1).getTime(), now.getTime()]
+ },
+ {
+ label: t('project.metrics.last_7_days'),
+ value: () => [subDays(now, 7).getTime(), now.getTime()]
+ }
+ ]
+}
diff --git
a/seatunnel-ui/src/views/task/synchronization-instance/detail/index.tsx
b/seatunnel-ui/src/views/task/synchronization-instance/detail/index.tsx
index 7174070c..30abfc5c 100644
--- a/seatunnel-ui/src/views/task/synchronization-instance/detail/index.tsx
+++ b/seatunnel-ui/src/views/task/synchronization-instance/detail/index.tsx
@@ -18,6 +18,7 @@
import { defineComponent } from 'vue'
import { RunningInstance } from './running-instance'
import { TaskDefinition } from './task-definition'
+import { TaskMetrics } from './task-metrics'
import {
NBreadcrumb,
NBreadcrumbItem,
@@ -70,20 +71,22 @@ const SynchronizationInstanceDetail = defineComponent({
<NTabs type='segment'>
<NTabPane
name='task-definition'
- tab={this.t(
- 'project.synchronization_instance.sync_task_definition'
- )}
+
tab={this.t('project.synchronization_instance.sync_task_definition')}
>
<TaskDefinition />
</NTabPane>
<NTabPane
name='running-instance'
- tab={this.t(
- 'project.synchronization_instance.data_pipeline_running_instance'
- )}
+
tab={this.t('project.synchronization_instance.data_pipeline_running_instance')}
>
<RunningInstance />
</NTabPane>
+ <NTabPane
+ name='task-metrics'
+ tab={this.t('project.synchronization_instance.task_metrics')}
+ >
+ <TaskMetrics />
+ </NTabPane>
</NTabs>
</NSpace>
)
diff --git
a/seatunnel-ui/src/views/task/synchronization-instance/detail/task-metrics.module.scss
b/seatunnel-ui/src/views/task/synchronization-instance/detail/task-metrics.module.scss
new file mode 100644
index 00000000..14dc69d1
--- /dev/null
+++
b/seatunnel-ui/src/views/task/synchronization-instance/detail/task-metrics.module.scss
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+.chart {
+ height: 400px;
+ width: 100%;
+}
\ No newline at end of file
diff --git
a/seatunnel-ui/src/views/task/synchronization-instance/detail/task-metrics.tsx
b/seatunnel-ui/src/views/task/synchronization-instance/detail/task-metrics.tsx
new file mode 100644
index 00000000..f5e81c93
--- /dev/null
+++
b/seatunnel-ui/src/views/task/synchronization-instance/detail/task-metrics.tsx
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { defineComponent, onMounted, onUnmounted, nextTick } from 'vue'
+import { NCard, NGrid, NGi, NSpace, NDatePicker, NSelect } from 'naive-ui'
+import { useTaskMetrics } from './use-task-metrics'
+import styles from './task-metrics.module.scss'
+import { useI18n } from 'vue-i18n'
+
+const TaskMetrics = defineComponent({
+ name: 'TaskMetrics',
+ setup() {
+ const { variables, initCharts, updateCharts, handleDateRangeChange,
handleTimeOptionChange } = useTaskMetrics()
+ let timer: ReturnType<typeof setInterval>
+ const { t } = useI18n()
+
+ onMounted(async () => {
+ await nextTick()
+ initCharts()
+ updateCharts()
+ timer = setInterval(() => {
+ updateCharts()
+ }, 10000)
+ })
+
+ onUnmounted(() => {
+ if (timer) {
+ clearInterval(timer)
+ }
+ variables.readRowCountChart?.dispose()
+ variables.writeRowCountChart?.dispose()
+ variables.readQpsChart?.dispose()
+ variables.writeQpsChart?.dispose()
+ variables.delayChart?.dispose()
+ })
+
+ return {
+ variables,
+ handleDateRangeChange,
+ handleTimeOptionChange,
+ t
+ }
+ },
+ render() {
+ return (
+ <NGrid x-gap={12} cols={2}>
+ <NGi span={2}>
+ <NCard
+ title={this.t('project.metrics.metrics_title')}
+ headerStyle={{ padding: '16px 20px' }}
+ contentStyle={{ padding: '4px 20px 20px' }}
+ >
+ {{
+ header: () => (
+ <NSpace justify="space-between" align="center" style="width:
100%">
+ <span class="n-card-header__main">
+ {this.t('project.metrics.metrics_title')}
+ </span>
+ <NSpace align="center">
+ <NSelect
+ value={this.variables.selectedTimeOption}
+ options={this.variables.timeOptions}
+ onUpdateValue={this.handleTimeOptionChange}
+ style={{ width: '150px' }}
+ />
+ {this.variables.showDatePicker && (
+ <NDatePicker
+ type="datetimerange"
+ value={this.variables.dateRange}
+ onUpdateValue={this.handleDateRangeChange}
+ clearable
+ defaultTime={['00:00:00', '23:59:59']}
+ valueFormat="timestamp"
+ actions={['clear', 'confirm']}
+ style={{ width: '320px' }}
+ placeholder={[
+ this.t('project.metrics.start_time'),
+ this.t('project.metrics.end_time')
+ ]}
+ placement="bottom-end"
+ size="small"
+ to={false}
+ />
+ )}
+ </NSpace>
+ </NSpace>
+ ),
+ default: () => (
+ <NGrid x-gap={12} y-gap={12} cols={2}>
+ <NGi>
+ <NCard>
+ <div
+ ref={(el) => (this.variables.readRowCountChartRef =
el)}
+ class={styles.chart}
+ />
+ </NCard>
+ </NGi>
+ <NGi>
+ <NCard>
+ <div
+ ref={(el) => (this.variables.writeRowCountChartRef =
el)}
+ class={styles.chart}
+ />
+ </NCard>
+ </NGi>
+ <NGi>
+ <NCard>
+ <div
+ ref={(el) => (this.variables.readQpsChartRef = el)}
+ class={styles.chart}
+ />
+ </NCard>
+ </NGi>
+ <NGi>
+ <NCard>
+ <div
+ ref={(el) => (this.variables.writeQpsChartRef = el)}
+ class={styles.chart}
+ />
+ </NCard>
+ </NGi>
+ <NGi span={2}>
+ <NCard>
+ <div
+ ref={(el) => (this.variables.delayChartRef = el)}
+ class={styles.chart}
+ />
+ </NCard>
+ </NGi>
+ </NGrid>
+ )
+ }}
+ </NCard>
+ </NGi>
+ </NGrid>
+ )
+ }
+})
+
+export { TaskMetrics }
\ No newline at end of file
diff --git
a/seatunnel-ui/src/views/task/synchronization-instance/detail/use-task-metrics.ts
b/seatunnel-ui/src/views/task/synchronization-instance/detail/use-task-metrics.ts
new file mode 100644
index 00000000..b2636335
--- /dev/null
+++
b/seatunnel-ui/src/views/task/synchronization-instance/detail/use-task-metrics.ts
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { reactive, ref, onMounted } from 'vue'
+import type { EChartsOption, LineSeriesOption } from 'echarts'
+import * as echarts from 'echarts'
+import { useRoute } from 'vue-router'
+import { useI18n } from 'vue-i18n'
+import { format, subDays, subHours, subMinutes } from 'date-fns'
+import {
+ queryJobMetricsHistory,
+ querySyncTaskInstanceDag,
+ querySyncTaskInstanceDetail,
+ queryRunningInstancePaging
+} from '@/service/sync-task-instance'
+
+export function useTaskMetrics() {
+ const route = useRoute()
+ const { t } = useI18n()
+
+ const timeOptions = [
+ {
+ label: t('project.metrics.last_1_minute'),
+ value: '1min',
+ getTime: () => [subMinutes(new Date(), 1), new Date()]
+ },
+ {
+ label: t('project.metrics.last_10_minutes'),
+ value: '10min',
+ getTime: () => [subMinutes(new Date(), 10), new Date()]
+ },
+ {
+ label: t('project.metrics.last_1_hour'),
+ value: '1hour',
+ getTime: () => [subHours(new Date(), 1), new Date()]
+ },
+ {
+ label: t('project.metrics.last_3_hours'),
+ value: '3hours',
+ getTime: () => [subHours(new Date(), 3), new Date()]
+ },
+ {
+ label: t('project.metrics.last_1_day'),
+ value: '1day',
+ getTime: () => [subDays(new Date(), 1), new Date()]
+ },
+ {
+ label: t('project.metrics.last_7_days'),
+ value: '7days',
+ getTime: () => [subDays(new Date(), 7), new Date()]
+ },
+ {
+ label: t('project.metrics.custom_time'),
+ value: 'custom'
+ }
+ ]
+
+ const variables = reactive({
+ readRowCountChartRef: ref(),
+ writeRowCountChartRef: ref(),
+ readQpsChartRef: ref(),
+ writeQpsChartRef: ref(),
+ delayChartRef: ref(),
+ readRowCountChart: null as echarts.ECharts | null,
+ writeRowCountChart: null as echarts.ECharts | null,
+ readQpsChart: null as echarts.ECharts | null,
+ writeQpsChart: null as echarts.ECharts | null,
+ delayChart: null as echarts.ECharts | null,
+ metricsData: [] as any[],
+ dateRange: null as [number, number] | null,
+ selectedTimeOption: '1hour',
+ showDatePicker: false,
+ timeOptions
+ })
+
+ const formatTimeToString = (timestamp: number): string => {
+ return format(timestamp, 'yyyy-MM-dd HH:mm:ss')
+ }
+
+ const formatTimeData = (data: any[]) => {
+ return data.map(item => {
+ try {
+ const date = new Date(item.createTime)
+ return format(date, 'HH:mm:ss')
+ } catch (err) {
+ console.error('Error formatting time:', err)
+ return ''
+ }
+ })
+ }
+
+ const getChartOption = (title: string, data: any[], key: string):
EChartsOption => ({
+ title: {
+ text: title,
+ textStyle: {
+ fontSize: 14,
+ fontWeight: 'normal'
+ },
+ left: 'center'
+ },
+ tooltip: {
+ show: true,
+ trigger: 'item',
+ axisPointer: {
+ type: 'none'
+ },
+ position: 'top',
+ backgroundColor: 'rgba(255, 255, 255, 0.95)',
+ borderColor: '#E5E5E5',
+ borderWidth: 1,
+ padding: [8, 12],
+ borderRadius: 4,
+ textStyle: {
+ color: '#595959',
+ fontSize: 13
+ },
+ formatter: (params: any) => {
+ let value = params.value
+ if (key.includes('Qps')) {
+ value = value.toFixed(2)
+ } else if (value >= 10000) {
+ value = (value / 10000).toFixed(1) + 'w'
+ } else {
+ value = Math.round(value)
+ }
+
+ try {
+ const date = new
Date(variables.metricsData[params.dataIndex].createTime)
+ const fullDateTime = format(date, 'yyyy-MM-dd HH:mm:ss')
+
+ return `<div style="font-family: -apple-system, BlinkMacSystemFont,
'Segoe UI', Roboto, 'Helvetica Neue', Arial, sans-serif">
+ <div style="color: #8c8c8c; font-size: 12px; margin-bottom: 4px">
+ ${fullDateTime}
+ </div>
+ <div style="display: flex; align-items: center">
+ <span style="display: inline-block; width: 6px; height: 6px;
border-radius: 50%; background-color: ${params.color}; margin-right:
8px"></span>
+ <span style="font-weight: 500">${value}</span>
+ </div>
+ <div style="font-size: 12px; color: #8c8c8c; margin-top: 4px">
+ ${title}
+ </div>
+ </div>`
+ } catch (err) {
+ console.error('Error formatting tooltip time:', err)
+ return ''
+ }
+ }
+ },
+ grid: {
+ top: '15%',
+ left: '3%',
+ right: '4%',
+ bottom: '3%',
+ containLabel: true
+ },
+ xAxis: {
+ type: 'category',
+ boundaryGap: false,
+ data: formatTimeData(data),
+ axisLine: {
+ lineStyle: {
+ color: '#E5E5E5'
+ }
+ },
+ axisLabel: {
+ color: '#7F7F7F',
+ formatter: (value: string) => {
+ return value.substring(value.indexOf(' ') + 1)
+ }
+ }
+ },
+ yAxis: {
+ type: 'value',
+ splitLine: {
+ lineStyle: {
+ type: 'dashed',
+ color: '#E5E5E5'
+ }
+ },
+ axisLine: {
+ show: false
+ },
+ axisTick: {
+ show: false
+ },
+ axisLabel: {
+ color: '#7F7F7F',
+ formatter: (value: number) => {
+ if (key.includes('Qps')) {
+ return value.toFixed(2)
+ }
+ if (value >= 10000) {
+ return (value / 10000).toFixed(1) + 'w'
+ }
+ return Math.round(value).toString()
+ }
+ }
+ },
+ series: [{
+ type: 'line',
+ data: data.map(item => item[key]),
+ smooth: true,
+ symbol: 'circle',
+ symbolSize: 4,
+ showSymbol: true,
+ triggerEvent: true,
+ emphasis: {
+ focus: 'series',
+ itemStyle: {
+ color: '#1890FF',
+ borderWidth: 3,
+ borderColor: '#1890FF',
+ shadowBlur: 10,
+ shadowColor: 'rgba(0, 0, 0, 0.2)'
+ }
+ },
+ itemStyle: {
+ color: '#1890FF',
+ borderWidth: 1,
+ borderColor: '#fff',
+ opacity: 0.3
+ },
+ lineStyle: {
+ width: 2
+ },
+ areaStyle: {
+ color: new echarts.graphic.LinearGradient(0, 0, 0, 1, [
+ {
+ offset: 0,
+ color: 'rgba(24,144,255,0.3)'
+ },
+ {
+ offset: 1,
+ color: 'rgba(24,144,255,0.1)'
+ }
+ ])
+ }
+ } as LineSeriesOption],
+ })
+
+ const initCharts = () => {
+ try {
+ if (variables.readRowCountChartRef) {
+ variables.readRowCountChart?.dispose()
+ variables.readRowCountChart =
echarts.init(variables.readRowCountChartRef)
+ }
+ if (variables.writeRowCountChartRef) {
+ variables.writeRowCountChart?.dispose()
+ variables.writeRowCountChart =
echarts.init(variables.writeRowCountChartRef)
+ }
+ if (variables.readQpsChartRef) {
+ variables.readQpsChart?.dispose()
+ variables.readQpsChart = echarts.init(variables.readQpsChartRef)
+ }
+ if (variables.writeQpsChartRef) {
+ variables.writeQpsChart?.dispose()
+ variables.writeQpsChart = echarts.init(variables.writeQpsChartRef)
+ }
+ if (variables.delayChartRef) {
+ variables.delayChart?.dispose()
+ variables.delayChart = echarts.init(variables.delayChartRef)
+ }
+ } catch (err) {
+ console.error('Failed to initialize charts:', err)
+ }
+ }
+
+ const getChartTitle = (key: string) => {
+ return t(`project.task.metrics.${key}`)
+ }
+
+ const updateCharts = async () => {
+ try {
+ const params: any = {
+ jobInstanceId: route.query.jobInstanceId as string
+ }
+
+ if (variables.dateRange) {
+ params.startTime = format(variables.dateRange[0], 'yyyy-MM-dd
HH:mm:ss')
+ params.endTime = format(variables.dateRange[1], 'yyyy-MM-dd HH:mm:ss')
+ }
+
+ const res = await queryJobMetricsHistory(params)
+ variables.metricsData = res
+
+ if (variables.readRowCountChart) {
+ variables.readRowCountChart.setOption(
+ getChartOption(getChartTitle('read_row_count'),
variables.metricsData, 'readRowCount')
+ )
+ }
+ if (variables.writeRowCountChart) {
+ variables.writeRowCountChart.setOption(
+ getChartOption(getChartTitle('write_row_count'),
variables.metricsData, 'writeRowCount')
+ )
+ }
+ if (variables.readQpsChart) {
+ variables.readQpsChart.setOption(
+ getChartOption(getChartTitle('read_qps'), variables.metricsData,
'readQps')
+ )
+ }
+ if (variables.writeQpsChart) {
+ variables.writeQpsChart.setOption(
+ getChartOption(getChartTitle('write_qps'), variables.metricsData,
'writeQps')
+ )
+ }
+ if (variables.delayChart) {
+ variables.delayChart.setOption(
+ getChartOption(getChartTitle('record_delay'), variables.metricsData,
'recordDelay')
+ )
+ }
+ } catch (err) {
+ console.error('Failed to fetch metrics data:', err)
+ }
+ }
+
+ const handleTimeOptionChange = (value: string) => {
+ variables.selectedTimeOption = value
+
+ if (value === 'custom') {
+ variables.showDatePicker = true
+ return
+ }
+
+ variables.showDatePicker = false
+ const option = timeOptions.find(opt => opt.value === value)
+ if (option && option.getTime) {
+ const [start, end] = option.getTime()
+ variables.dateRange = [start.getTime(), end.getTime()]
+ updateCharts()
+ }
+ }
+
+ const handleDateRangeChange = (value: [number, number] | null) => {
+ variables.dateRange = value
+ variables.selectedTimeOption = 'custom'
+ if (value) {
+ updateCharts()
+ }
+ }
+
+ onMounted(() => {
+ handleTimeOptionChange('1hour')
+ })
+
+ return {
+ variables,
+ initCharts,
+ updateCharts,
+ handleDateRangeChange,
+ handleTimeOptionChange
+ }
+}
\ No newline at end of file