This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 09a15adb0a Fix Metrics lost data bug (#6976) 09a15adb0a is described below commit 09a15adb0af91880125c60cc8dfe9f4d4358f557 Author: Eric <gaojun2...@gmail.com> AuthorDate: Thu Jun 13 11:03:45 2024 +0800 Fix Metrics lost data bug (#6976) --- .../seatunnel/engine/server/TaskExecutionService.java | 3 ++- .../engine/server/checkpoint/CheckpointManager.java | 5 +++++ .../apache/seatunnel/engine/server/master/JobMaster.java | 3 ++- .../seatunnel/engine/server/master/JobMetricsTest.java | 14 ++++++++------ 4 files changed, 17 insertions(+), 8 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java index 049c9c374a..93dbd3eca6 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java @@ -627,7 +627,7 @@ public class TaskExecutionService implements DynamicMetricsProvider { try { if (!metricsImap.tryLock( Constant.IMAP_RUNNING_JOB_METRICS_KEY, 2, TimeUnit.SECONDS)) { - logger.info("try lock failed in update metrics"); + logger.warning("try lock failed in update metrics"); return; } HashMap<TaskLocation, SeaTunnelMetricsContext> centralMap = @@ -953,6 +953,7 @@ public class TaskExecutionService implements DynamicMetricsProvider { taskGroupLocation, executionContexts.remove(taskGroupLocation)); cancellationFutures.remove(taskGroupLocation); cancelAsyncFunction(taskGroupLocation); + updateMetricsContextInImap(); if (ex == null) { future.complete( new TaskExecutionState(taskGroupLocation, ExecutionState.FINISHED)); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java index 21c2b90df5..a38703dee8 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java @@ -35,6 +35,7 @@ import org.apache.seatunnel.engine.server.checkpoint.operation.TaskReportStatusO import org.apache.seatunnel.engine.server.checkpoint.operation.TriggerSchemaChangeAfterCheckpointOperation; import org.apache.seatunnel.engine.server.checkpoint.operation.TriggerSchemaChangeBeforeCheckpointOperation; import org.apache.seatunnel.engine.server.dag.execution.Pipeline; +import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation; import org.apache.seatunnel.engine.server.dag.physical.SubPlan; import org.apache.seatunnel.engine.server.execution.Task; import org.apache.seatunnel.engine.server.execution.TaskLocation; @@ -314,6 +315,10 @@ public class CheckpointManager { .orElse(false); } + public boolean isPipelineSavePointEnd(PipelineLocation pipelineLocation) { + return coordinatorMap.get(pipelineLocation.getPipelineId()).isEndOfSavePoint(); + } + protected InvocationFuture<?> sendOperationToMemberNode(TaskOperation operation) { log.debug( "Sead Operation : " diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index 7c7b26fa16..2e9e168af2 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -609,7 +609,8 @@ public class JobMaster { public void removeMetricsContext( PipelineLocation pipelineLocation, PipelineStatus pipelineStatus) { - if (pipelineStatus.equals(PipelineStatus.FINISHED) && !checkpointManager.isSavePointEnd() + if ((pipelineStatus.equals(PipelineStatus.FINISHED) + && !checkpointManager.isPipelineSavePointEnd(pipelineLocation)) || pipelineStatus.equals(PipelineStatus.CANCELED)) { try { metricsImap.lock(Constant.IMAP_RUNNING_JOB_METRICS_KEY); diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java index 0e6202a0a6..6131ca88be 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java @@ -135,18 +135,20 @@ class JobMetricsTest extends AbstractSeaTunnelServerTest { .untilAsserted( () -> { JobMetrics jobMetrics = coordinatorService.getJobMetrics(jobId3); - assertTrue(40 < (Long) jobMetrics.get(SINK_WRITE_COUNT).get(0).value()); - assertTrue(40 < (Long) jobMetrics.get(SINK_WRITE_COUNT).get(1).value()); assertTrue( - 40 - < (Long) + 100 <= (Long) jobMetrics.get(SINK_WRITE_COUNT).get(0).value()); + assertTrue( + 100 <= (Long) jobMetrics.get(SINK_WRITE_COUNT).get(1).value()); + assertTrue( + 100 + <= (Long) jobMetrics .get(SOURCE_RECEIVED_COUNT) .get(0) .value()); assertTrue( - 40 - < (Long) + 100 + <= (Long) jobMetrics .get(SOURCE_RECEIVED_COUNT) .get(1)