This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 09a15adb0a Fix Metrics lost data bug (#6976)
09a15adb0a is described below

commit 09a15adb0af91880125c60cc8dfe9f4d4358f557
Author: Eric <gaojun2...@gmail.com>
AuthorDate: Thu Jun 13 11:03:45 2024 +0800

    Fix Metrics lost data bug (#6976)
---
 .../seatunnel/engine/server/TaskExecutionService.java      |  3 ++-
 .../engine/server/checkpoint/CheckpointManager.java        |  5 +++++
 .../apache/seatunnel/engine/server/master/JobMaster.java   |  3 ++-
 .../seatunnel/engine/server/master/JobMetricsTest.java     | 14 ++++++++------
 4 files changed, 17 insertions(+), 8 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 049c9c374a..93dbd3eca6 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -627,7 +627,7 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
             try {
                 if (!metricsImap.tryLock(
                         Constant.IMAP_RUNNING_JOB_METRICS_KEY, 2, 
TimeUnit.SECONDS)) {
-                    logger.info("try lock failed in update metrics");
+                    logger.warning("try lock failed in update metrics");
                     return;
                 }
                 HashMap<TaskLocation, SeaTunnelMetricsContext> centralMap =
@@ -953,6 +953,7 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
                         taskGroupLocation, 
executionContexts.remove(taskGroupLocation));
                 cancellationFutures.remove(taskGroupLocation);
                 cancelAsyncFunction(taskGroupLocation);
+                updateMetricsContextInImap();
                 if (ex == null) {
                     future.complete(
                             new TaskExecutionState(taskGroupLocation, 
ExecutionState.FINISHED));
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index 21c2b90df5..a38703dee8 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -35,6 +35,7 @@ import 
org.apache.seatunnel.engine.server.checkpoint.operation.TaskReportStatusO
 import 
org.apache.seatunnel.engine.server.checkpoint.operation.TriggerSchemaChangeAfterCheckpointOperation;
 import 
org.apache.seatunnel.engine.server.checkpoint.operation.TriggerSchemaChangeBeforeCheckpointOperation;
 import org.apache.seatunnel.engine.server.dag.execution.Pipeline;
+import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
 import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
 import org.apache.seatunnel.engine.server.execution.Task;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
@@ -314,6 +315,10 @@ public class CheckpointManager {
                 .orElse(false);
     }
 
+    public boolean isPipelineSavePointEnd(PipelineLocation pipelineLocation) {
+        return 
coordinatorMap.get(pipelineLocation.getPipelineId()).isEndOfSavePoint();
+    }
+
     protected InvocationFuture<?> sendOperationToMemberNode(TaskOperation 
operation) {
         log.debug(
                 "Sead Operation : "
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 7c7b26fa16..2e9e168af2 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -609,7 +609,8 @@ public class JobMaster {
 
     public void removeMetricsContext(
             PipelineLocation pipelineLocation, PipelineStatus pipelineStatus) {
-        if (pipelineStatus.equals(PipelineStatus.FINISHED) && 
!checkpointManager.isSavePointEnd()
+        if ((pipelineStatus.equals(PipelineStatus.FINISHED)
+                        && 
!checkpointManager.isPipelineSavePointEnd(pipelineLocation))
                 || pipelineStatus.equals(PipelineStatus.CANCELED)) {
             try {
                 metricsImap.lock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
index 0e6202a0a6..6131ca88be 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
@@ -135,18 +135,20 @@ class JobMetricsTest extends AbstractSeaTunnelServerTest {
                 .untilAsserted(
                         () -> {
                             JobMetrics jobMetrics = 
coordinatorService.getJobMetrics(jobId3);
-                            assertTrue(40 < (Long) 
jobMetrics.get(SINK_WRITE_COUNT).get(0).value());
-                            assertTrue(40 < (Long) 
jobMetrics.get(SINK_WRITE_COUNT).get(1).value());
                             assertTrue(
-                                    40
-                                            < (Long)
+                                    100 <= (Long) 
jobMetrics.get(SINK_WRITE_COUNT).get(0).value());
+                            assertTrue(
+                                    100 <= (Long) 
jobMetrics.get(SINK_WRITE_COUNT).get(1).value());
+                            assertTrue(
+                                    100
+                                            <= (Long)
                                                     jobMetrics
                                                             
.get(SOURCE_RECEIVED_COUNT)
                                                             .get(0)
                                                             .value());
                             assertTrue(
-                                    40
-                                            < (Long)
+                                    100
+                                            <= (Long)
                                                     jobMetrics
                                                             
.get(SOURCE_RECEIVED_COUNT)
                                                             .get(1)

Reply via email to