This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git


The following commit(s) were added to refs/heads/main by this push:
     new a5074a1d [Feature] Add task monitor chart (#281)
a5074a1d is described below

commit a5074a1de8e68b48247c7aae41426a735510cdb3
Author: Jast <[email protected]>
AuthorDate: Wed May 7 14:23:38 2025 +0800

    [Feature] Add task monitor chart (#281)
---
 .../apache/seatunnel/app/config/MonitorConfig.java |  33 ++
 .../app/controller/JobMetricsController.java       |  16 +
 .../seatunnel/app/dal/dao/IJobInstanceDao.java     |   2 +
 .../app/dal/dao/IJobMetricsHistoryDao.java         |  40 +++
 .../app/dal/dao/impl/JobInstanceDaoImpl.java       |   5 +
 .../app/dal/dao/impl/JobMetricsHistoryDaoImpl.java |  65 ++++
 .../app/dal/entity/JobMetricsHistory.java          |  89 +++++
 .../app/dal/mapper/JobInstanceMapper.java          |   5 +
 .../app/dal/mapper/JobMetricsHistoryMapper.java    |  49 +++
 .../app/scheduler/MonitorTaskScheduler.java        | 208 ++++++++++++
 .../seatunnel/app/service/IJobMetricsService.java  |  15 +
 .../app/service/impl/JobMetricsServiceImpl.java    |  33 ++
 .../seatunnel/app/dal/mapper/JobInstanceMapper.xml |   5 +
 .../app/dal/mapper/JobMetricsHistoryMapper.xml     |  97 ++++++
 .../resources/script/seatunnel_server_mysql.sql    |  29 ++
 seatunnel-ui/src/locales/en_US/project.ts          |  10 +-
 seatunnel-ui/src/locales/zh_CN/project.ts          |  24 +-
 .../src/service/sync-task-instance/index.ts        |   9 +
 seatunnel-ui/src/utils/timePickeroption.ts         |  33 ++
 .../task/synchronization-instance/detail/index.tsx |  15 +-
 .../detail/task-metrics.module.scss                |  21 ++
 .../detail/task-metrics.tsx                        | 154 +++++++++
 .../detail/use-task-metrics.ts                     | 366 +++++++++++++++++++++
 23 files changed, 1314 insertions(+), 9 deletions(-)

diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/config/MonitorConfig.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/config/MonitorConfig.java
new file mode 100644
index 00000000..a062df44
--- /dev/null
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/config/MonitorConfig.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.config;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.web.client.RestTemplate;
+
+@Configuration
+@EnableScheduling
+public class MonitorConfig {
+
+    @Bean
+    public RestTemplate restTemplate() {
+        return new RestTemplate();
+    }
+}
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobMetricsController.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobMetricsController.java
index 673f93aa..67e7044d 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobMetricsController.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobMetricsController.java
@@ -18,10 +18,13 @@
 package org.apache.seatunnel.app.controller;
 
 import org.apache.seatunnel.app.common.Result;
+import org.apache.seatunnel.app.dal.entity.JobMetricsHistory;
 import org.apache.seatunnel.app.domain.response.metrics.JobDAG;
 import 
org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
 import 
org.apache.seatunnel.app.domain.response.metrics.JobPipelineSummaryMetricsRes;
 import org.apache.seatunnel.app.service.IJobMetricsService;
+import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
+import org.apache.seatunnel.server.common.SeatunnelException;
 
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.RequestMapping;
@@ -68,4 +71,17 @@ public class JobMetricsController {
             throws IOException {
         return 
Result.success(jobMetricsService.getJobPipelineSummaryMetrics(jobInstanceId));
     }
+
+    @GetMapping("/history")
+    public Result<List<JobMetricsHistory>> getJobMetricsHistory(
+            @RequestParam("jobInstanceId") Long jobInstanceId,
+            @RequestParam(value = "startTime", required = false) String 
startTime,
+            @RequestParam(value = "endTime", required = false) String endTime) 
{
+        if (jobInstanceId == null) {
+            throw new SeatunnelException(
+                    SeatunnelErrorEnum.UNKNOWN, "jobInstanceId cannot be 
null");
+        }
+        return Result.success(
+                jobMetricsService.getJobMetricsHistory(jobInstanceId, 
startTime, endTime));
+    }
 }
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobInstanceDao.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobInstanceDao.java
index ec161933..6a67cbbc 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobInstanceDao.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobInstanceDao.java
@@ -49,6 +49,8 @@ public interface IJobInstanceDao {
 
     List<JobInstance> getAllJobInstance(@NonNull List<Long> jobInstanceIdList);
 
+    List<JobInstance> getAllRunningJobInstance();
+
     JobInstance getJobExecutionStatus(@NonNull Long jobInstanceId);
 
     void deleteById(@NonNull Long jobInstanceId);
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobMetricsHistoryDao.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobMetricsHistoryDao.java
new file mode 100644
index 00000000..a00445c2
--- /dev/null
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobMetricsHistoryDao.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.dal.dao;
+
+import org.apache.seatunnel.app.dal.entity.JobMetricsHistory;
+
+import java.util.List;
+
+public interface IJobMetricsHistoryDao {
+    /** Insert a monitoring record */
+    void insert(JobMetricsHistory jobMetricsHistory);
+
+    /** Batch insertion of monitoring records */
+    void insertBatch(List<JobMetricsHistory> jobMetricsHistories);
+
+    /** Batch insertion of monitoring records */
+    List<JobMetricsHistory> getByJobInstanceId(Long jobInstanceId);
+
+    /** Get monitoring history records according to job instance ID and 
pipeline ID. */
+    List<JobMetricsHistory> getByJobInstanceIdAndPipelineId(Long 
jobInstanceId, Integer pipelineId);
+
+    /** Query monitoring history based on homework instance ID and time range 
*/
+    List<JobMetricsHistory> getByJobInstanceIdAndTimeRange(
+            Long jobInstanceId, String startTime, String endTime);
+}
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobInstanceDaoImpl.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobInstanceDaoImpl.java
index 5e9d0496..996b5551 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobInstanceDaoImpl.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobInstanceDaoImpl.java
@@ -87,6 +87,11 @@ public class JobInstanceDaoImpl implements IJobInstanceDao {
                 page, startTime, endTime, jobDefineName, jobMode, 
getWorkspaceId());
     }
 
+    @Override
+    public List<JobInstance> getAllRunningJobInstance() {
+        return jobInstanceMapper.getAllRunningJobInstance();
+    }
+
     @Override
     public List<JobInstance> getAllJobInstance(@NonNull List<Long> 
jobInstanceIdList) {
         return jobInstanceMapper.selectList(
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobMetricsHistoryDaoImpl.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobMetricsHistoryDaoImpl.java
new file mode 100644
index 00000000..756317da
--- /dev/null
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobMetricsHistoryDaoImpl.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.dal.dao.impl;
+
+import org.apache.seatunnel.app.dal.dao.IJobMetricsHistoryDao;
+import org.apache.seatunnel.app.dal.entity.JobMetricsHistory;
+import org.apache.seatunnel.app.dal.mapper.JobMetricsHistoryMapper;
+
+import org.springframework.stereotype.Repository;
+
+import javax.annotation.Resource;
+
+import java.util.List;
+
+@Repository
+public class JobMetricsHistoryDaoImpl implements IJobMetricsHistoryDao {
+
+    @Resource private JobMetricsHistoryMapper jobMetricsHistoryMapper;
+
+    @Override
+    public void insert(JobMetricsHistory jobMetricsHistory) {
+        jobMetricsHistoryMapper.insert(jobMetricsHistory);
+    }
+
+    @Override
+    public void insertBatch(List<JobMetricsHistory> jobMetricsHistories) {
+        if (!jobMetricsHistories.isEmpty()) {
+            jobMetricsHistoryMapper.insertBatchMetrics(jobMetricsHistories);
+        }
+    }
+
+    @Override
+    public List<JobMetricsHistory> getByJobInstanceId(Long jobInstanceId) {
+        return 
jobMetricsHistoryMapper.queryJobMetricsHistoryByInstanceId(jobInstanceId);
+    }
+
+    @Override
+    public List<JobMetricsHistory> getByJobInstanceIdAndPipelineId(
+            Long jobInstanceId, Integer pipelineId) {
+        return 
jobMetricsHistoryMapper.queryJobMetricsHistoryByInstanceIdAndPipelineId(
+                jobInstanceId, pipelineId);
+    }
+
+    @Override
+    public List<JobMetricsHistory> getByJobInstanceIdAndTimeRange(
+            Long jobInstanceId, String startTime, String endTime) {
+        return 
jobMetricsHistoryMapper.queryJobMetricsHistoryByInstanceIdAndTimeRange(
+                jobInstanceId, startTime, endTime);
+    }
+}
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobMetricsHistory.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobMetricsHistory.java
new file mode 100644
index 00000000..abeb565c
--- /dev/null
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobMetricsHistory.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.app.dal.entity;
+
+import org.apache.seatunnel.engine.core.job.JobStatus;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.LocalDateTime;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+@TableName("t_st_job_metrics_history")
+/**
+ * We will expand the JobMetricsHistory metric in the future, so we did not 
use the JobMetrics table
+ */
+public class JobMetricsHistory {
+
+    @TableId(value = "id", type = IdType.INPUT)
+    private Long id;
+
+    @TableField("job_instance_id")
+    private Long jobInstanceId;
+
+    @TableField("pipeline_id")
+    private Integer pipelineId;
+
+    @TableField("read_row_count")
+    private long readRowCount;
+
+    @TableField("write_row_count")
+    private long writeRowCount;
+
+    @TableField("source_table_names")
+    private String sourceTableNames;
+
+    @TableField("sink_table_names")
+    private String sinkTableNames;
+
+    @TableField("read_qps")
+    private long readQps;
+
+    @TableField("write_qps")
+    private long writeQps;
+
+    @TableField("record_delay")
+    private long recordDelay;
+
+    @TableField("status")
+    private JobStatus status;
+
+    @TableField("create_user_id")
+    private Integer createUserId;
+
+    @TableField("update_user_id")
+    private Integer updateUserId;
+
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "UTC")
+    @TableField("create_time")
+    private LocalDateTime createTime;
+
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "UTC")
+    @TableField("update_time")
+    private LocalDateTime updateTime;
+}
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.java
index fb013a46..1270c524 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.java
@@ -29,6 +29,7 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 
 import java.util.Date;
+import java.util.List;
 
 @Mapper
 public interface JobInstanceMapper extends BaseMapper<JobInstance> {
@@ -39,4 +40,8 @@ public interface JobInstanceMapper extends 
BaseMapper<JobInstance> {
             @Param("jobDefineName") String jobDefineName,
             @Param("jobMode") JobMode jobMode,
             @Param("workspaceId") Long workspaceId);
+
+    JobInstance getJobExecutionStatus(@Param("jobInstanceId") Long 
jobInstanceId);
+
+    List<JobInstance> getAllRunningJobInstance();
 }
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/mapper/JobMetricsHistoryMapper.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/mapper/JobMetricsHistoryMapper.java
new file mode 100644
index 00000000..d3af85eb
--- /dev/null
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/mapper/JobMetricsHistoryMapper.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.dal.mapper;
+
+import org.apache.seatunnel.app.dal.entity.JobMetricsHistory;
+
+import org.apache.ibatis.annotations.Mapper;
+import org.apache.ibatis.annotations.Param;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
+import java.util.List;
+
+@Mapper
+public interface JobMetricsHistoryMapper extends BaseMapper<JobMetricsHistory> 
{
+
+    /** Batch insertion of monitoring history records */
+    void insertBatchMetrics(
+            @Param("jobMetricsHistories") List<JobMetricsHistory> 
jobMetricsHistories);
+
+    /** Query monitoring history based on homework instance ID */
+    List<JobMetricsHistory> queryJobMetricsHistoryByInstanceId(
+            @Param("jobInstanceId") Long jobInstanceId);
+
+    /** Query monitoring history based on homework instance ID and pipeline ID 
*/
+    List<JobMetricsHistory> queryJobMetricsHistoryByInstanceIdAndPipelineId(
+            @Param("jobInstanceId") Long jobInstanceId, @Param("pipelineId") 
Integer pipelineId);
+
+    /** Query monitoring history based on homework instance ID and time range 
*/
+    List<JobMetricsHistory> queryJobMetricsHistoryByInstanceIdAndTimeRange(
+            @Param("jobInstanceId") Long jobInstanceId,
+            @Param("startTime") String startTime,
+            @Param("endTime") String endTime);
+}
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/scheduler/MonitorTaskScheduler.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/scheduler/MonitorTaskScheduler.java
new file mode 100644
index 00000000..de30281f
--- /dev/null
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/scheduler/MonitorTaskScheduler.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.scheduler;
+
+import 
org.apache.seatunnel.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.seatunnel.app.dal.dao.IJobInstanceDao;
+import org.apache.seatunnel.app.dal.dao.IJobMetricsHistoryDao;
+import org.apache.seatunnel.app.dal.entity.JobInstance;
+import org.apache.seatunnel.app.dal.entity.JobMetricsHistory;
+import 
org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
+import org.apache.seatunnel.app.service.IJobMetricsService;
+import org.apache.seatunnel.engine.core.job.JobStatus;
+
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import lombok.extern.slf4j.Slf4j;
+
+import javax.annotation.PreDestroy;
+import javax.annotation.Resource;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@Slf4j
+@Component
+public class MonitorTaskScheduler {
+
+    private final ExecutorService executorService;
+
+    @Resource private IJobInstanceDao jobInstanceDao;
+
+    @Resource private IJobMetricsService jobMetricsService;
+
+    @Resource private IJobMetricsHistoryDao jobMetricsHistoryDao;
+
+    private final ConcurrentHashMap<Long, JobInstance> jobInstanceMap = new 
ConcurrentHashMap<>();
+
+    private final Object mapLock = new Object();
+
+    public MonitorTaskScheduler() {
+        // Create thread pool
+        this.executorService =
+                new ThreadPoolExecutor(
+                        5,
+                        10,
+                        60L,
+                        TimeUnit.SECONDS,
+                        new LinkedBlockingQueue<>(100),
+                        new ThreadFactoryBuilder()
+                                .setNameFormat("task-processor-%d")
+                                .setUncaughtExceptionHandler(
+                                        (t, e) ->
+                                                log.error(
+                                                        "Thread {} encountered 
uncaught exception",
+                                                        t.getName(),
+                                                        e))
+                                .build(),
+                        new ThreadPoolExecutor.CallerRunsPolicy());
+    }
+
+    @Scheduled(initialDelay = 0, fixedRate = 60000)
+    public void updateJobInstance() {
+        try {
+            log.info("Start updating job instance information...");
+            List<JobInstance> allJobInstance = 
jobInstanceDao.getAllRunningJobInstance();
+
+            Map<Long, JobInstance> newInstanceMap =
+                    allJobInstance.stream()
+                            .collect(
+                                    Collectors.toMap(
+                                            JobInstance::getId,
+                                            instance -> instance,
+                                            (existing, replacement) -> 
replacement));
+
+            synchronized (mapLock) {
+                jobInstanceMap.clear();
+                jobInstanceMap.putAll(newInstanceMap);
+            }
+
+            log.debug(
+                    "Job instance information updated, current total 
instances: {}",
+                    jobInstanceMap.size());
+        } catch (Exception e) {
+            log.error("Error updating job instance information", e);
+        }
+    }
+
+    public JobInstance getJobInstance(Long jobInstanceId) {
+        synchronized (mapLock) {
+            return jobInstanceMap.get(jobInstanceId);
+        }
+    }
+
+    public List<JobInstance> getAllJobInstances() {
+        synchronized (mapLock) {
+            return new ArrayList<>(jobInstanceMap.values());
+        }
+    }
+
+    @Scheduled(fixedDelay = 5000)
+    public void scheduleTasks() {
+        List<JobInstance> instances;
+        synchronized (mapLock) {
+            instances = new ArrayList<>(jobInstanceMap.values());
+        }
+
+        instances.forEach(
+                jobInstance -> {
+                    if (jobInstance.getJobStatus() != JobStatus.RUNNING) {
+                        return;
+                    }
+                    try {
+                        executorService.submit(
+                                () -> {
+                                    try {
+                                        Long jobInstanceId = 
jobInstance.getId();
+                                        List<JobPipelineDetailMetricsRes> 
metricsResList =
+                                                
jobMetricsService.getJobPipelineDetailMetricsRes(
+                                                        jobInstance);
+
+                                        if (metricsResList != null && 
!metricsResList.isEmpty()) {
+                                            List<JobMetricsHistory> 
historyList =
+                                                    metricsResList.stream()
+                                                            .map(
+                                                                    metrics ->
+                                                                            
convertToJobMetricsHistory(
+                                                                               
     metrics,
+                                                                               
     jobInstanceId))
+                                                            
.collect(Collectors.toList());
+
+                                            
jobMetricsHistoryDao.insertBatch(historyList);
+                                            log.debug(
+                                                    "Successfully saved 
metrics for job {}, total {} records",
+                                                    jobInstanceId,
+                                                    historyList.size());
+                                        }
+                                    } catch (Exception e) {
+                                        log.error("Error saving job metrics", 
e);
+                                    }
+                                });
+                    } catch (Exception e) {
+                        log.error("Task scheduling error", e);
+                    }
+                });
+    }
+
+    private JobMetricsHistory convertToJobMetricsHistory(
+            JobPipelineDetailMetricsRes metrics, Long jobInstanceId) {
+        return JobMetricsHistory.builder()
+                .id(generateId())
+                .jobInstanceId(jobInstanceId)
+                .pipelineId(metrics.getPipelineId())
+                .readRowCount(metrics.getReadRowCount())
+                .writeRowCount(metrics.getWriteRowCount())
+                .sourceTableNames(metrics.getSourceTableNames())
+                .sinkTableNames(metrics.getSinkTableNames())
+                .readQps(metrics.getReadQps())
+                .writeQps(metrics.getWriteQps())
+                .recordDelay(metrics.getRecordDelay())
+                .status(metrics.getStatus())
+                .createUserId(-1)
+                .updateUserId(-1)
+                .build();
+    }
+
+    private Long generateId() {
+        // Here you can use a distributed ID generator, such as Snowflake 
algorithm
+        return System.currentTimeMillis();
+    }
+
+    @PreDestroy
+    public void shutdown() {
+        log.info("Shutting down task scheduler...");
+        executorService.shutdown();
+        try {
+            if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
+                executorService.shutdownNow();
+            }
+        } catch (InterruptedException e) {
+            executorService.shutdownNow();
+            Thread.currentThread().interrupt();
+        }
+    }
+}
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobMetricsService.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobMetricsService.java
index 4fbaca40..20324af0 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobMetricsService.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobMetricsService.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.app.service;
 
 import org.apache.seatunnel.app.dal.entity.JobInstance;
+import org.apache.seatunnel.app.dal.entity.JobMetricsHistory;
 import org.apache.seatunnel.app.domain.response.metrics.JobDAG;
 import 
org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
 import 
org.apache.seatunnel.app.domain.response.metrics.JobPipelineSummaryMetricsRes;
@@ -40,6 +41,9 @@ public interface IJobMetricsService {
 
     JobDAG getJobDAG(@NonNull Long jobInstanceId) throws 
JsonProcessingException;
 
+    public List<JobPipelineDetailMetricsRes> getJobPipelineDetailMetricsRes(
+            @NonNull JobInstance jobInstance);
+
     ImmutablePair<Long, String> getInstanceIdAndEngineId(@NonNull String key);
 
     void syncJobDataToDb(@NonNull JobInstance jobInstance, @NonNull String 
jobEngineId);
@@ -51,4 +55,15 @@ public interface IJobMetricsService {
             @NonNull Map<Long, Long> jobInstanceIdAndJobEngineIdMap,
             @NonNull List<Long> jobInstanceIdList,
             @NonNull JobMode jobMode);
+
+    /**
+     * Get job metrics history data
+     *
+     * @param jobInstanceId job instance id
+     * @return List of metrics data points
+     */
+    List<JobMetricsHistory> getJobMetricsHistory(@NonNull Long jobInstanceId);
+
+    List<JobMetricsHistory> getJobMetricsHistory(
+            Long jobInstanceId, String startTime, String endTime);
 }
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java
index 0d9bad0f..9637b277 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java
@@ -23,6 +23,8 @@ import org.apache.seatunnel.app.dal.dao.IJobMetricsDao;
 import org.apache.seatunnel.app.dal.entity.JobInstance;
 import org.apache.seatunnel.app.dal.entity.JobInstanceHistory;
 import org.apache.seatunnel.app.dal.entity.JobMetrics;
+import org.apache.seatunnel.app.dal.entity.JobMetricsHistory;
+import org.apache.seatunnel.app.dal.mapper.JobMetricsHistoryMapper;
 import org.apache.seatunnel.app.domain.response.engine.Engine;
 import org.apache.seatunnel.app.domain.response.metrics.JobDAG;
 import 
org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
@@ -47,6 +49,7 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.dao.DuplicateKeyException;
 import org.springframework.stereotype.Service;
 
@@ -72,6 +75,8 @@ public class JobMetricsServiceImpl extends 
SeatunnelBaseServiceImpl implements I
 
     @Resource private IJobInstanceDao jobInstanceDao;
 
+    @Autowired private JobMetricsHistoryMapper jobMetricsHistoryMapper;
+
     @Override
     public List<JobPipelineSummaryMetricsRes> getJobPipelineSummaryMetrics(
             @NonNull Long jobInstanceId) {
@@ -474,6 +479,19 @@ public class JobMetricsServiceImpl extends 
SeatunnelBaseServiceImpl implements I
                 .collect(Collectors.toList());
     }
 
+    @Override
+    public List<JobPipelineDetailMetricsRes> getJobPipelineDetailMetricsRes(
+            @NonNull JobInstance jobInstance) {
+        if (JobUtils.isJobEndStatus(jobInstance.getJobStatus())) {
+            return new ArrayList<>();
+        }
+        List<JobMetrics> jobPipelineDetailMetrics =
+                getJobMetricsFromEngine(jobInstance, 
jobInstance.getJobEngineId());
+        return jobPipelineDetailMetrics.stream()
+                .map(this::wrapperJobMetrics)
+                .collect(Collectors.toList());
+    }
+
     @Override
     public JobDAG getJobDAG(@NonNull Long jobInstanceId) {
         int userId = ServletUtils.getCurrentUserId();
@@ -683,4 +701,19 @@ public class JobMetricsServiceImpl extends 
SeatunnelBaseServiceImpl implements I
             jobMetricsDao.getJobMetricsMapper().insertBatchMetrics(list);
         }
     }
+
+    @Override
+    @NonNull public List<JobMetricsHistory> getJobMetricsHistory(@NonNull Long 
jobInstanceId) {
+        return 
jobMetricsHistoryMapper.queryJobMetricsHistoryByInstanceId(jobInstanceId);
+    }
+
+    @Override
+    public List<JobMetricsHistory> getJobMetricsHistory(
+            Long jobInstanceId, String startTime, String endTime) {
+        if (StringUtils.isNotEmpty(startTime) && 
StringUtils.isNotEmpty(endTime)) {
+            return 
jobMetricsHistoryMapper.queryJobMetricsHistoryByInstanceIdAndTimeRange(
+                    jobInstanceId, startTime, endTime);
+        }
+        return getJobMetricsHistory(jobInstanceId);
+    }
 }
diff --git 
a/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.xml
 
b/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.xml
index 0978573d..f7b81555 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.xml
+++ 
b/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.xml
@@ -57,4 +57,9 @@
         </where>
         ORDER BY ji.create_time DESC
     </select>
+    <select id="getAllRunningJobInstance" 
resultType="org.apache.seatunnel.app.dal.entity.JobInstance">
+        SELECT <include refid="Base_Column_List"/>
+        FROM t_st_job_instance t
+        WHERE t.job_status = 'RUNNING'
+    </select>
 </mapper>
diff --git 
a/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobMetricsHistoryMapper.xml
 
b/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobMetricsHistoryMapper.xml
new file mode 100644
index 00000000..1cda1ada
--- /dev/null
+++ 
b/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobMetricsHistoryMapper.xml
@@ -0,0 +1,97 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" 
"http://mybatis.org/dtd/mybatis-3-mapper.dtd";>
+<mapper 
namespace="org.apache.seatunnel.app.dal.mapper.JobMetricsHistoryMapper">
+    <resultMap id="BaseResultMap" 
type="org.apache.seatunnel.app.dal.entity.JobMetricsHistory">
+        <id column="id" jdbcType="BIGINT" property="id"/>
+        <result column="job_instance_id" jdbcType="BIGINT" 
property="jobInstanceId"/>
+        <result column="pipeline_id" jdbcType="INTEGER" property="pipelineId"/>
+        <result column="read_row_count" jdbcType="BIGINT" 
property="readRowCount"/>
+        <result column="write_row_count" jdbcType="BIGINT" 
property="writeRowCount"/>
+        <result column="source_table_names" jdbcType="VARCHAR" 
property="sourceTableNames"/>
+        <result column="sink_table_names" jdbcType="VARCHAR" 
property="sinkTableNames"/>
+        <result column="read_qps" jdbcType="BIGINT" property="readQps"/>
+        <result column="write_qps" jdbcType="BIGINT" property="writeQps"/>
+        <result column="record_delay" jdbcType="BIGINT" 
property="recordDelay"/>
+        <result column="status" jdbcType="VARCHAR" property="status"/>
+        <result column="create_user_id" jdbcType="INTEGER" 
property="createUserId"/>
+        <result column="update_user_id" jdbcType="INTEGER" 
property="updateUserId"/>
+        <result column="create_time" jdbcType="TIMESTAMP" 
property="createTime"/>
+        <result column="update_time" jdbcType="TIMESTAMP" 
property="updateTime"/>
+    </resultMap>
+
+    <sql id="Base_Column_List">
+        id, job_instance_id, pipeline_id, read_row_count, write_row_count, 
+        source_table_names, sink_table_names, read_qps, write_qps, 
record_delay,
+        status, create_user_id, update_user_id, create_time, update_time
+    </sql>
+
+    <insert id="insertBatchMetrics">
+        insert into t_st_job_metrics_history (
+            <include refid="Base_Column_List"/>
+        )
+        values
+        <foreach collection="jobMetricsHistories" item="metrics" separator=",">
+            (
+                #{metrics.id},
+                #{metrics.jobInstanceId},
+                #{metrics.pipelineId},
+                #{metrics.readRowCount},
+                #{metrics.writeRowCount},
+                #{metrics.sourceTableNames},
+                #{metrics.sinkTableNames},
+                #{metrics.readQps},
+                #{metrics.writeQps},
+                #{metrics.recordDelay},
+                #{metrics.status},
+                #{metrics.createUserId},
+                #{metrics.updateUserId},
+                #{metrics.createTime},
+                #{metrics.updateTime}
+            )
+        </foreach>
+    </insert>
+
+    <select id="queryJobMetricsHistoryByInstanceId" resultMap="BaseResultMap">
+        select 
+        <include refid="Base_Column_List"/>
+        from t_st_job_metrics_history
+        where job_instance_id = #{jobInstanceId}
+        order by create_time asc
+    </select>
+
+    <select id="queryJobMetricsHistoryByInstanceIdAndPipelineId" 
resultMap="BaseResultMap">
+        select 
+        <include refid="Base_Column_List"/>
+        from t_st_job_metrics_history
+        where job_instance_id = #{jobInstanceId}
+        and pipeline_id = #{pipelineId}
+        order by create_time desc
+    </select>
+
+    <select id="queryJobMetricsHistoryByInstanceIdAndTimeRange" 
resultType="org.apache.seatunnel.app.dal.entity.JobMetricsHistory">
+        SELECT *
+        FROM t_st_job_metrics_history
+        WHERE job_instance_id = #{jobInstanceId}
+        <if test="startTime != null">
+            AND create_time >= #{startTime}
+        </if>
+        <if test="endTime != null">
+            AND create_time &lt;= #{endTime}
+        </if>
+        ORDER BY create_time ASC
+    </select>
+</mapper> 
\ No newline at end of file
diff --git 
a/seatunnel-server/seatunnel-app/src/main/resources/script/seatunnel_server_mysql.sql
 
b/seatunnel-server/seatunnel-app/src/main/resources/script/seatunnel_server_mysql.sql
index 5dd91e4c..e3660fba 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/resources/script/seatunnel_server_mysql.sql
+++ 
b/seatunnel-server/seatunnel-app/src/main/resources/script/seatunnel_server_mysql.sql
@@ -284,3 +284,32 @@ CREATE TABLE `workspace`  (
 ) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = 
utf8mb4_general_ci ROW_FORMAT = Dynamic;
 
 INSERT INTO `seatunnel`.`workspace`(`workspace_name`,`description`) values 
('default', 'default workspace');
+
+
+DROP TABLE IF EXISTS `t_st_job_metrics_history`;
+-- auto-generated definition
+create table t_st_job_metrics_history
+(
+    id                 bigint                                    not null
+        primary key,
+    job_instance_id    bigint                                    not null,
+    pipeline_id        int(20)                                   not null,
+    read_row_count     bigint                                    not null,
+    write_row_count    bigint                                    not null,
+    source_table_names varchar(200)                              null,
+    sink_table_names   varchar(200)                              null,
+    read_qps           bigint                                    null,
+    write_qps          bigint                                    null,
+    record_delay       bigint                                    null,
+    status             varchar(20)                               null,
+    create_user_id     int(20)                                   not null,
+    update_user_id     int(20)                                   null,
+    create_time        timestamp(3) default CURRENT_TIMESTAMP(3) not null,
+    update_time        timestamp(3) default CURRENT_TIMESTAMP(3) not null on 
update CURRENT_TIMESTAMP(3)
+)
+    collate = utf8mb4_bin
+    row_format = DYNAMIC;
+
+create index idx_job_instance_id_create_time
+    on t_st_job_metrics_history (job_instance_id, create_time);
+
diff --git a/seatunnel-ui/src/locales/en_US/project.ts 
b/seatunnel-ui/src/locales/en_US/project.ts
index b6bc46a6..d137c9e4 100644
--- a/seatunnel-ui/src/locales/en_US/project.ts
+++ b/seatunnel-ui/src/locales/en_US/project.ts
@@ -293,6 +293,13 @@ export default {
       'The following workflow do not meet the execution conditions'
   },
   task: {
+    metrics: {
+      read_row_count: 'Read Data Trend',
+      write_row_count: 'Write Data Trend',
+      read_qps: 'Read QPS Trend',
+      write_qps: 'Write QPS Trend',
+      record_delay: 'Data Delay Trend(s)'
+    },
     cancel_full_screen: 'Cancel full screen',
     enter_full_screen: 'Enter full screen',
     current_task_settings: 'Current task settings',
@@ -1146,7 +1153,8 @@ export default {
     cancel: 'Cancel',
     delete: 'Delete',
     delete_confirm: 'Delete?',
-    error_message: 'Error'
+    error_message: 'Error',
+    task_metrics: 'Task Monitor'
   },
   menu: {
     fav: 'Favorites',
diff --git a/seatunnel-ui/src/locales/zh_CN/project.ts 
b/seatunnel-ui/src/locales/zh_CN/project.ts
index 3e059f18..c69e6f3d 100644
--- a/seatunnel-ui/src/locales/zh_CN/project.ts
+++ b/seatunnel-ui/src/locales/zh_CN/project.ts
@@ -290,6 +290,13 @@ export default {
     dependent_chain_condition_title: '以下工作流不满足执行条件'
   },
   task: {
+    metrics: {
+      read_row_count: '读取数据量趋势',
+      write_row_count: '写入数据量趋势',
+      read_qps: '读取QPS趋势',
+      write_qps: '写入QPS趋势',
+      record_delay: '数据延迟趋势(秒)'
+    },
     cancel_full_screen: '取消全屏',
     enter_full_screen: '全屏',
     current_task_settings: '当前任务设置',
@@ -1113,7 +1120,8 @@ export default {
     confirm: '确定',
     cancel: '取消',
     delete: '删除',
-    delete_confirm: '确定删除吗?'
+    delete_confirm: '确定删除吗?',
+    task_metrics: '任务监控'
   },
   menu: {
     fav: '收藏组件',
@@ -1147,5 +1155,17 @@ export default {
   all_project: '全部项目',
   next_step: '下一步',
   pre_step: '上一步',
-  project_name: '项目名称'
+  project_name: '项目名称',
+  metrics: {
+    last_1_minute: '最近1分钟',
+    last_10_minutes: '最近10分钟',
+    last_1_hour: '最近1小时',
+    last_3_hours: '最近3小时',
+    last_1_day: '最近1天',
+    last_7_days: '最近7天',
+    custom_time: '自定义时间',
+    start_time: '开始时间',
+    end_time: '结束时间',
+    metrics_title: '任务指标'
+  }
 }
diff --git a/seatunnel-ui/src/service/sync-task-instance/index.ts 
b/seatunnel-ui/src/service/sync-task-instance/index.ts
index c6c98a01..b39b05ac 100644
--- a/seatunnel-ui/src/service/sync-task-instance/index.ts
+++ b/seatunnel-ui/src/service/sync-task-instance/index.ts
@@ -89,3 +89,12 @@ export function hanldleDelJob(id: number): any {
     method: 'delete'
   })
 }
+
+export function queryJobMetricsHistory(params: { jobInstanceId: string | 
number }): any {
+  return axios({
+    url: '/job/metrics/history',
+    method: 'get',
+    timeout: 60000,
+    params
+  })
+}
diff --git a/seatunnel-ui/src/utils/timePickeroption.ts 
b/seatunnel-ui/src/utils/timePickeroption.ts
index a78c9418..cae5d3de 100644
--- a/seatunnel-ui/src/utils/timePickeroption.ts
+++ b/seatunnel-ui/src/utils/timePickeroption.ts
@@ -15,6 +15,8 @@
  * limitations under the License.
  */
 
+import { format, subDays, subHours, subMinutes } from 'date-fns'
+
 export const getNowDate = (): any => {
   return [
     new Date(new Date().toLocaleDateString()).getTime(),
@@ -54,3 +56,34 @@ export function getRangeShortCuts(t: any) {
   })
   return rangeShortCuts
 }
+
+export const getMetricsRangeShortcuts = (t: any) => {
+  const now = new Date()
+  
+  return [
+    {
+      label: t('project.metrics.last_1_minute'),
+      value: () => [subMinutes(now, 1).getTime(), now.getTime()]
+    },
+    {
+      label: t('project.metrics.last_10_minutes'),
+      value: () => [subMinutes(now, 10).getTime(), now.getTime()]
+    },
+    {
+      label: t('project.metrics.last_1_hour'),
+      value: () => [subHours(now, 1).getTime(), now.getTime()]
+    },
+    {
+      label: t('project.metrics.last_3_hours'),
+      value: () => [subHours(now, 3).getTime(), now.getTime()]
+    },
+    {
+      label: t('project.metrics.last_1_day'),
+      value: () => [subDays(now, 1).getTime(), now.getTime()]
+    },
+    {
+      label: t('project.metrics.last_7_days'),
+      value: () => [subDays(now, 7).getTime(), now.getTime()]
+    }
+  ]
+}
diff --git 
a/seatunnel-ui/src/views/task/synchronization-instance/detail/index.tsx 
b/seatunnel-ui/src/views/task/synchronization-instance/detail/index.tsx
index 7174070c..30abfc5c 100644
--- a/seatunnel-ui/src/views/task/synchronization-instance/detail/index.tsx
+++ b/seatunnel-ui/src/views/task/synchronization-instance/detail/index.tsx
@@ -18,6 +18,7 @@
 import { defineComponent } from 'vue'
 import { RunningInstance } from './running-instance'
 import { TaskDefinition } from './task-definition'
+import { TaskMetrics } from './task-metrics'
 import {
   NBreadcrumb,
   NBreadcrumbItem,
@@ -70,20 +71,22 @@ const SynchronizationInstanceDetail = defineComponent({
         <NTabs type='segment'>
           <NTabPane
             name='task-definition'
-            tab={this.t(
-              'project.synchronization_instance.sync_task_definition'
-            )}
+            
tab={this.t('project.synchronization_instance.sync_task_definition')}
           >
             <TaskDefinition />
           </NTabPane>
           <NTabPane
             name='running-instance'
-            tab={this.t(
-              'project.synchronization_instance.data_pipeline_running_instance'
-            )}
+            
tab={this.t('project.synchronization_instance.data_pipeline_running_instance')}
           >
             <RunningInstance />
           </NTabPane>
+          <NTabPane
+            name='task-metrics'
+            tab={this.t('project.synchronization_instance.task_metrics')}
+          >
+            <TaskMetrics />
+          </NTabPane>
         </NTabs>
       </NSpace>
     )
diff --git 
a/seatunnel-ui/src/views/task/synchronization-instance/detail/task-metrics.module.scss
 
b/seatunnel-ui/src/views/task/synchronization-instance/detail/task-metrics.module.scss
new file mode 100644
index 00000000..14dc69d1
--- /dev/null
+++ 
b/seatunnel-ui/src/views/task/synchronization-instance/detail/task-metrics.module.scss
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+.chart {
+  height: 400px;
+  width: 100%;
+} 
\ No newline at end of file
diff --git 
a/seatunnel-ui/src/views/task/synchronization-instance/detail/task-metrics.tsx 
b/seatunnel-ui/src/views/task/synchronization-instance/detail/task-metrics.tsx
new file mode 100644
index 00000000..f5e81c93
--- /dev/null
+++ 
b/seatunnel-ui/src/views/task/synchronization-instance/detail/task-metrics.tsx
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { defineComponent, onMounted, onUnmounted, nextTick } from 'vue'
+import { NCard, NGrid, NGi, NSpace, NDatePicker, NSelect } from 'naive-ui'
+import { useTaskMetrics } from './use-task-metrics'
+import styles from './task-metrics.module.scss'
+import { useI18n } from 'vue-i18n'
+
+const TaskMetrics = defineComponent({
+  name: 'TaskMetrics',
+  setup() {
+    const { variables, initCharts, updateCharts, handleDateRangeChange, 
handleTimeOptionChange } = useTaskMetrics()
+    let timer: ReturnType<typeof setInterval>
+    const { t } = useI18n()
+
+    onMounted(async () => {
+      await nextTick()
+      initCharts()
+      updateCharts()
+      timer = setInterval(() => {
+        updateCharts()
+      }, 10000)
+    })
+
+    onUnmounted(() => {
+      if (timer) {
+        clearInterval(timer)
+      }
+      variables.readRowCountChart?.dispose()
+      variables.writeRowCountChart?.dispose()
+      variables.readQpsChart?.dispose()
+      variables.writeQpsChart?.dispose()
+      variables.delayChart?.dispose()
+    })
+
+    return { 
+      variables, 
+      handleDateRangeChange,
+      handleTimeOptionChange,
+      t
+    }
+  },
+  render() {
+    return (
+      <NGrid x-gap={12} cols={2}>
+        <NGi span={2}>
+          <NCard
+            title={this.t('project.metrics.metrics_title')}
+            headerStyle={{ padding: '16px 20px' }}
+            contentStyle={{ padding: '4px 20px 20px' }}
+          >
+            {{
+              header: () => (
+                <NSpace justify="space-between" align="center" style="width: 
100%">
+                  <span class="n-card-header__main">
+                    {this.t('project.metrics.metrics_title')}
+                  </span>
+                  <NSpace align="center">
+                    <NSelect
+                      value={this.variables.selectedTimeOption}
+                      options={this.variables.timeOptions}
+                      onUpdateValue={this.handleTimeOptionChange}
+                      style={{ width: '150px' }}
+                    />
+                    {this.variables.showDatePicker && (
+                      <NDatePicker
+                        type="datetimerange"
+                        value={this.variables.dateRange}
+                        onUpdateValue={this.handleDateRangeChange}
+                        clearable
+                        defaultTime={['00:00:00', '23:59:59']}
+                        valueFormat="timestamp"
+                        actions={['clear', 'confirm']}
+                        style={{ width: '320px' }}
+                        placeholder={[
+                          this.t('project.metrics.start_time'),
+                          this.t('project.metrics.end_time')
+                        ]}
+                        placement="bottom-end"
+                        size="small"
+                        to={false}
+                      />
+                    )}
+                  </NSpace>
+                </NSpace>
+              ),
+              default: () => (
+                <NGrid x-gap={12} y-gap={12} cols={2}>
+                  <NGi>
+                    <NCard>
+                      <div
+                        ref={(el) => (this.variables.readRowCountChartRef = 
el)}
+                        class={styles.chart}
+                      />
+                    </NCard>
+                  </NGi>
+                  <NGi>
+                    <NCard>
+                      <div
+                        ref={(el) => (this.variables.writeRowCountChartRef = 
el)}
+                        class={styles.chart}
+                      />
+                    </NCard>
+                  </NGi>
+                  <NGi>
+                    <NCard>
+                      <div
+                        ref={(el) => (this.variables.readQpsChartRef = el)}
+                        class={styles.chart}
+                      />
+                    </NCard>
+                  </NGi>
+                  <NGi>
+                    <NCard>
+                      <div
+                        ref={(el) => (this.variables.writeQpsChartRef = el)}
+                        class={styles.chart}
+                      />
+                    </NCard>
+                  </NGi>
+                  <NGi span={2}>
+                    <NCard>
+                      <div
+                        ref={(el) => (this.variables.delayChartRef = el)}
+                        class={styles.chart}
+                      />
+                    </NCard>
+                  </NGi>
+                </NGrid>
+              )
+            }}
+          </NCard>
+        </NGi>
+      </NGrid>
+    )
+  }
+})
+
+export { TaskMetrics } 
\ No newline at end of file
diff --git 
a/seatunnel-ui/src/views/task/synchronization-instance/detail/use-task-metrics.ts
 
b/seatunnel-ui/src/views/task/synchronization-instance/detail/use-task-metrics.ts
new file mode 100644
index 00000000..b2636335
--- /dev/null
+++ 
b/seatunnel-ui/src/views/task/synchronization-instance/detail/use-task-metrics.ts
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { reactive, ref, onMounted } from 'vue'
+import type { EChartsOption, LineSeriesOption } from 'echarts'
+import * as echarts from 'echarts'
+import { useRoute } from 'vue-router'
+import { useI18n } from 'vue-i18n'
+import { format, subDays, subHours, subMinutes } from 'date-fns'
+import { 
+  queryJobMetricsHistory,
+  querySyncTaskInstanceDag,
+  querySyncTaskInstanceDetail,
+  queryRunningInstancePaging 
+} from '@/service/sync-task-instance'
+
+export function useTaskMetrics() {
+  const route = useRoute()
+  const { t } = useI18n()
+  
+  const timeOptions = [
+    {
+      label: t('project.metrics.last_1_minute'),
+      value: '1min',
+      getTime: () => [subMinutes(new Date(), 1), new Date()]
+    },
+    {
+      label: t('project.metrics.last_10_minutes'),
+      value: '10min',
+      getTime: () => [subMinutes(new Date(), 10), new Date()]
+    },
+    {
+      label: t('project.metrics.last_1_hour'),
+      value: '1hour',
+      getTime: () => [subHours(new Date(), 1), new Date()]
+    },
+    {
+      label: t('project.metrics.last_3_hours'),
+      value: '3hours',
+      getTime: () => [subHours(new Date(), 3), new Date()]
+    },
+    {
+      label: t('project.metrics.last_1_day'),
+      value: '1day',
+      getTime: () => [subDays(new Date(), 1), new Date()]
+    },
+    {
+      label: t('project.metrics.last_7_days'),
+      value: '7days',
+      getTime: () => [subDays(new Date(), 7), new Date()]
+    },
+    {
+      label: t('project.metrics.custom_time'),
+      value: 'custom'
+    }
+  ]
+
+  const variables = reactive({
+    readRowCountChartRef: ref(),
+    writeRowCountChartRef: ref(),
+    readQpsChartRef: ref(),
+    writeQpsChartRef: ref(),
+    delayChartRef: ref(),
+    readRowCountChart: null as echarts.ECharts | null,
+    writeRowCountChart: null as echarts.ECharts | null,
+    readQpsChart: null as echarts.ECharts | null,
+    writeQpsChart: null as echarts.ECharts | null,
+    delayChart: null as echarts.ECharts | null,
+    metricsData: [] as any[],
+    dateRange: null as [number, number] | null,
+    selectedTimeOption: '1hour',
+    showDatePicker: false,
+    timeOptions
+  })
+
+  const formatTimeToString = (timestamp: number): string => {
+    return format(timestamp, 'yyyy-MM-dd HH:mm:ss')
+  }
+
+  const formatTimeData = (data: any[]) => {
+    return data.map(item => {
+      try {
+        const date = new Date(item.createTime)
+        return format(date, 'HH:mm:ss')
+      } catch (err) {
+        console.error('Error formatting time:', err)
+        return ''
+      }
+    })
+  }
+
+  const getChartOption = (title: string, data: any[], key: string): 
EChartsOption => ({
+    title: { 
+      text: title,
+      textStyle: {
+        fontSize: 14,
+        fontWeight: 'normal'
+      },
+      left: 'center'
+    },
+    tooltip: { 
+      show: true,
+      trigger: 'item',
+      axisPointer: {
+        type: 'none'
+      },
+      position: 'top',
+      backgroundColor: 'rgba(255, 255, 255, 0.95)',
+      borderColor: '#E5E5E5',
+      borderWidth: 1,
+      padding: [8, 12],
+      borderRadius: 4,
+      textStyle: {
+        color: '#595959',
+        fontSize: 13
+      },
+      formatter: (params: any) => {
+        let value = params.value
+        if (key.includes('Qps')) {
+          value = value.toFixed(2)
+        } else if (value >= 10000) {
+          value = (value / 10000).toFixed(1) + 'w'
+        } else {
+          value = Math.round(value)
+        }
+        
+        try {
+          const date = new 
Date(variables.metricsData[params.dataIndex].createTime)
+          const fullDateTime = format(date, 'yyyy-MM-dd HH:mm:ss')
+          
+          return `<div style="font-family: -apple-system, BlinkMacSystemFont, 
'Segoe UI', Roboto, 'Helvetica Neue', Arial, sans-serif">
+            <div style="color: #8c8c8c; font-size: 12px; margin-bottom: 4px">
+              ${fullDateTime}
+            </div>
+            <div style="display: flex; align-items: center">
+              <span style="display: inline-block; width: 6px; height: 6px; 
border-radius: 50%; background-color: ${params.color}; margin-right: 
8px"></span>
+              <span style="font-weight: 500">${value}</span>
+            </div>
+            <div style="font-size: 12px; color: #8c8c8c; margin-top: 4px">
+              ${title}
+            </div>
+          </div>`
+        } catch (err) {
+          console.error('Error formatting tooltip time:', err)
+          return ''
+        }
+      }
+    },
+    grid: {
+      top: '15%',
+      left: '3%',
+      right: '4%',
+      bottom: '3%',
+      containLabel: true
+    },
+    xAxis: {
+      type: 'category',
+      boundaryGap: false,
+      data: formatTimeData(data),
+      axisLine: {
+        lineStyle: {
+          color: '#E5E5E5'
+        }
+      },
+      axisLabel: {
+        color: '#7F7F7F',
+        formatter: (value: string) => {
+          return value.substring(value.indexOf(' ') + 1)
+        }
+      }
+    },
+    yAxis: { 
+      type: 'value',
+      splitLine: {
+        lineStyle: {
+          type: 'dashed',
+          color: '#E5E5E5'
+        }
+      },
+      axisLine: {
+        show: false
+      },
+      axisTick: {
+        show: false
+      },
+      axisLabel: {
+        color: '#7F7F7F',
+        formatter: (value: number) => {
+          if (key.includes('Qps')) {
+            return value.toFixed(2)
+          }
+          if (value >= 10000) {
+            return (value / 10000).toFixed(1) + 'w'
+          }
+          return Math.round(value).toString()
+        }
+      }
+    },
+    series: [{
+      type: 'line',
+      data: data.map(item => item[key]),
+      smooth: true,
+      symbol: 'circle',
+      symbolSize: 4,
+      showSymbol: true,
+      triggerEvent: true,
+      emphasis: {
+        focus: 'series',
+        itemStyle: {
+          color: '#1890FF',
+          borderWidth: 3,
+          borderColor: '#1890FF',
+          shadowBlur: 10,
+          shadowColor: 'rgba(0, 0, 0, 0.2)'
+        }
+      },
+      itemStyle: {
+        color: '#1890FF',
+        borderWidth: 1,
+        borderColor: '#fff',
+        opacity: 0.3
+      },
+      lineStyle: {
+        width: 2
+      },
+      areaStyle: {
+        color: new echarts.graphic.LinearGradient(0, 0, 0, 1, [
+          {
+            offset: 0,
+            color: 'rgba(24,144,255,0.3)'
+          },
+          {
+            offset: 1,
+            color: 'rgba(24,144,255,0.1)'
+          }
+        ])
+      }
+    } as LineSeriesOption],
+  })
+
+  const initCharts = () => {
+    try {
+      if (variables.readRowCountChartRef) {
+        variables.readRowCountChart?.dispose()
+        variables.readRowCountChart = 
echarts.init(variables.readRowCountChartRef)
+      }
+      if (variables.writeRowCountChartRef) {
+        variables.writeRowCountChart?.dispose()
+        variables.writeRowCountChart = 
echarts.init(variables.writeRowCountChartRef)
+      }
+      if (variables.readQpsChartRef) {
+        variables.readQpsChart?.dispose()
+        variables.readQpsChart = echarts.init(variables.readQpsChartRef)
+      }
+      if (variables.writeQpsChartRef) {
+        variables.writeQpsChart?.dispose()
+        variables.writeQpsChart = echarts.init(variables.writeQpsChartRef)
+      }
+      if (variables.delayChartRef) {
+        variables.delayChart?.dispose()
+        variables.delayChart = echarts.init(variables.delayChartRef)
+      }
+    } catch (err) {
+      console.error('Failed to initialize charts:', err)
+    }
+  }
+
+  const getChartTitle = (key: string) => {
+    return t(`project.task.metrics.${key}`)
+  }
+
+  const updateCharts = async () => {
+    try {
+      const params: any = {
+        jobInstanceId: route.query.jobInstanceId as string
+      }
+      
+      if (variables.dateRange) {
+        params.startTime = format(variables.dateRange[0], 'yyyy-MM-dd 
HH:mm:ss')
+        params.endTime = format(variables.dateRange[1], 'yyyy-MM-dd HH:mm:ss')
+      }
+
+      const res = await queryJobMetricsHistory(params)
+      variables.metricsData = res
+
+      if (variables.readRowCountChart) {
+        variables.readRowCountChart.setOption(
+          getChartOption(getChartTitle('read_row_count'), 
variables.metricsData, 'readRowCount')
+        )
+      }
+      if (variables.writeRowCountChart) {
+        variables.writeRowCountChart.setOption(
+          getChartOption(getChartTitle('write_row_count'), 
variables.metricsData, 'writeRowCount')
+        )
+      }
+      if (variables.readQpsChart) {
+        variables.readQpsChart.setOption(
+          getChartOption(getChartTitle('read_qps'), variables.metricsData, 
'readQps')
+        )
+      }
+      if (variables.writeQpsChart) {
+        variables.writeQpsChart.setOption(
+          getChartOption(getChartTitle('write_qps'), variables.metricsData, 
'writeQps')
+        )
+      }
+      if (variables.delayChart) {
+        variables.delayChart.setOption(
+          getChartOption(getChartTitle('record_delay'), variables.metricsData, 
'recordDelay')
+        )
+      }
+    } catch (err) {
+      console.error('Failed to fetch metrics data:', err)
+    }
+  }
+
+  const handleTimeOptionChange = (value: string) => {
+    variables.selectedTimeOption = value
+    
+    if (value === 'custom') {
+      variables.showDatePicker = true
+      return
+    }
+    
+    variables.showDatePicker = false
+    const option = timeOptions.find(opt => opt.value === value)
+    if (option && option.getTime) {
+      const [start, end] = option.getTime()
+      variables.dateRange = [start.getTime(), end.getTime()]
+      updateCharts()
+    }
+  }
+
+  const handleDateRangeChange = (value: [number, number] | null) => {
+    variables.dateRange = value
+    variables.selectedTimeOption = 'custom'
+    if (value) {
+      updateCharts()
+    }
+  }
+
+  onMounted(() => {
+    handleTimeOptionChange('1hour')
+  })
+
+  return {
+    variables,
+    initCharts,
+    updateCharts,
+    handleDateRangeChange,
+    handleTimeOptionChange
+  }
+} 
\ No newline at end of file

Reply via email to