This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 94bb6350df [Feature][Zeta] Add metrics for task intermediate queue 
size (#9550)
94bb6350df is described below

commit 94bb6350df4ced5661268795aa8c43804983e196
Author: Jia Fan <[email protected]>
AuthorDate: Tue Jul 22 11:27:51 2025 +0800

    [Feature][Zeta] Add metrics for task intermediate queue size (#9550)
---
 docs/en/seatunnel-engine/rest-api-v2.md              |  2 ++
 docs/zh/seatunnel-engine/rest-api-v2.md              |  1 +
 .../seatunnel/api/common/metrics/MetricNames.java    |  2 ++
 .../org/apache/seatunnel/engine/e2e/RestApiIT.java   |  1 +
 ...Context.java => ConnectorMetricsCalcContext.java} | 12 ++++++------
 .../engine/server/rest/service/BaseService.java      |  7 ++++++-
 .../engine/server/task/SeaTunnelSourceCollector.java | 15 +++++++--------
 .../seatunnel/engine/server/task/SeaTunnelTask.java  |  2 +-
 .../engine/server/task/flow/SinkFlowLifeCycle.java   | 19 ++++++++++---------
 .../AbstractTaskGroupWithIntermediateQueue.java      |  4 +++-
 .../TaskGroupWithIntermediateBlockingQueue.java      | 20 ++++++++++++++++----
 .../group/TaskGroupWithIntermediateDisruptor.java    |  3 ++-
 .../task/group/queue/IntermediateBlockingQueue.java  |  9 ++++++++-
 .../engine/server/master/JobMetricsTest.java         | 17 +++++++++++++++++
 14 files changed, 82 insertions(+), 32 deletions(-)

diff --git a/docs/en/seatunnel-engine/rest-api-v2.md 
b/docs/en/seatunnel-engine/rest-api-v2.md
index e5b85de941..3ea6242d57 100644
--- a/docs/en/seatunnel-engine/rest-api-v2.md
+++ b/docs/en/seatunnel-engine/rest-api-v2.md
@@ -157,6 +157,7 @@ Please refer [security](security.md)
     "pipelineEdges": {}
   },
   "metrics": {
+    "IntermediateQueueSize": "",
     "SourceReceivedCount": "",
     "SourceReceivedQPS": "",
     "SourceReceivedBytes": "",
@@ -237,6 +238,7 @@ This API has been deprecated, please use /job-info/:jobId 
instead
     "pipelineEdges": {}
   },
   "metrics": {
+    "IntermediateQueueSize": "",
     "SourceReceivedCount": "",
     "SourceReceivedQPS": "",
     "SourceReceivedBytes": "",
diff --git a/docs/zh/seatunnel-engine/rest-api-v2.md 
b/docs/zh/seatunnel-engine/rest-api-v2.md
index b2951b5e13..79b02c92e4 100644
--- a/docs/zh/seatunnel-engine/rest-api-v2.md
+++ b/docs/zh/seatunnel-engine/rest-api-v2.md
@@ -153,6 +153,7 @@ seatunnel:
     "pipelineEdges": {}
   },
   "metrics": {
+    "IntermediateQueueSize": "",
     "SourceReceivedCount": "",
     "SourceReceivedQPS": "",
     "SourceReceivedBytes": "",
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java
index b1fc60e0f1..46c71047b9 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java
@@ -33,4 +33,6 @@ public final class MetricNames {
     public static final String SINK_WRITE_BYTES = "SinkWriteBytes";
     public static final String SINK_WRITE_QPS = "SinkWriteQPS";
     public static final String SINK_WRITE_BYTES_PER_SECONDS = 
"SinkWriteBytesPerSeconds";
+
+    public static final String INTERMEDIATE_QUEUE_SIZE = 
"IntermediateQueueSize";
 }
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index 83b44130e1..b54911cd86 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -641,6 +641,7 @@ public class RestApiIT {
                                                         equalTo("5"))
                                                 
.body("metrics.SinkWriteCount", equalTo("5"))
                                                 
.body("metrics.SourceReceivedCount", equalTo("5"))
+                                                
.body("metrics.IntermediateQueueSize", equalTo("0"))
                                                 .body(
                                                         
"jobDag.envOptions.'job.mode'",
                                                         equalTo("BATCH"))
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/TaskMetricsCalcContext.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/ConnectorMetricsCalcContext.java
similarity index 94%
rename from 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/TaskMetricsCalcContext.java
rename to 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/ConnectorMetricsCalcContext.java
index 6890421f9f..ddf4cf18a0 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/TaskMetricsCalcContext.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/ConnectorMetricsCalcContext.java
@@ -39,7 +39,7 @@ import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVE
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_COUNT;
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_QPS;
 
-public class TaskMetricsCalcContext {
+public class ConnectorMetricsCalcContext {
 
     private final MetricsContext metricsContext;
 
@@ -47,21 +47,21 @@ public class TaskMetricsCalcContext {
 
     private Counter count;
 
-    private Map<String, Counter> countPerTable = new ConcurrentHashMap<>();
+    private final Map<String, Counter> countPerTable = new 
ConcurrentHashMap<>();
 
     private Meter QPS;
 
-    private Map<String, Meter> QPSPerTable = new ConcurrentHashMap<>();
+    private final Map<String, Meter> QPSPerTable = new ConcurrentHashMap<>();
 
     private Counter bytes;
 
-    private Map<String, Counter> bytesPerTable = new ConcurrentHashMap<>();
+    private final Map<String, Counter> bytesPerTable = new 
ConcurrentHashMap<>();
 
     private Meter bytesPerSeconds;
 
-    private Map<String, Meter> bytesPerSecondsPerTable = new 
ConcurrentHashMap<>();
+    private final Map<String, Meter> bytesPerSecondsPerTable = new 
ConcurrentHashMap<>();
 
-    public TaskMetricsCalcContext(
+    public ConnectorMetricsCalcContext(
             MetricsContext metricsContext,
             PluginType type,
             boolean isMulti,
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
index 5e81fc79a5..05e55a9378 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
@@ -78,6 +78,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.INTERMEDIATE_QUEUE_SIZE;
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES;
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES_PER_SECONDS;
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
@@ -245,7 +246,11 @@ public abstract class BaseService {
         Map<String, Object> metricsMap = new HashMap<>();
         // To add metrics, populate the corresponding array,
         String[] countMetricsNames = {
-            SOURCE_RECEIVED_COUNT, SINK_WRITE_COUNT, SOURCE_RECEIVED_BYTES, 
SINK_WRITE_BYTES
+            SOURCE_RECEIVED_COUNT,
+            SINK_WRITE_COUNT,
+            SOURCE_RECEIVED_BYTES,
+            SINK_WRITE_BYTES,
+            INTERMEDIATE_QUEUE_SIZE
         };
         String[] rateMetricsNames = {
             SOURCE_RECEIVED_QPS,
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
index b3cafb9d68..04ccbda35d 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
@@ -32,7 +32,7 @@ import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.core.starter.flowcontrol.FlowControlGate;
 import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
-import org.apache.seatunnel.engine.server.metrics.TaskMetricsCalcContext;
+import org.apache.seatunnel.engine.server.metrics.ConnectorMetricsCalcContext;
 import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle;
 
 import org.apache.commons.collections4.CollectionUtils;
@@ -52,9 +52,7 @@ public class SeaTunnelSourceCollector<T> implements 
Collector<T> {
 
     private final List<OneInputFlowLifeCycle<Record<?>>> outputs;
 
-    private final MetricsContext metricsContext;
-
-    private final TaskMetricsCalcContext taskMetricsCalcContext;
+    private final ConnectorMetricsCalcContext connectorMetricsCalcContext;
 
     private final AtomicBoolean schemaChangeBeforeCheckpointSignal = new 
AtomicBoolean(false);
 
@@ -77,14 +75,13 @@ public class SeaTunnelSourceCollector<T> implements 
Collector<T> {
         this.checkpointLock = checkpointLock;
         this.outputs = outputs;
         this.rowType = rowType;
-        this.metricsContext = metricsContext;
         if (rowType instanceof MultipleRowType) {
             ((MultipleRowType) rowType)
                     .iterator()
                     .forEachRemaining(type -> 
this.rowTypeMap.put(type.getKey(), type.getValue()));
         }
-        this.taskMetricsCalcContext =
-                new TaskMetricsCalcContext(
+        this.connectorMetricsCalcContext =
+                new ConnectorMetricsCalcContext(
                         metricsContext,
                         PluginType.SOURCE,
                         CollectionUtils.isNotEmpty(tablePaths),
@@ -97,6 +94,8 @@ public class SeaTunnelSourceCollector<T> implements 
Collector<T> {
         try {
             if (row instanceof SeaTunnelRow) {
                 String tableId = ((SeaTunnelRow) row).getTableId();
+                // init the size of row early with rowType, this way is faster 
than init the size
+                // without rowType
                 int size;
                 if (rowType instanceof SeaTunnelRowType) {
                     size = ((SeaTunnelRow) 
row).getBytesSize((SeaTunnelRowType) rowType);
@@ -107,7 +106,7 @@ public class SeaTunnelSourceCollector<T> implements 
Collector<T> {
                             "Unsupported row type: " + 
rowType.getClass().getName());
                 }
                 flowControlGate.audit((SeaTunnelRow) row);
-                taskMetricsCalcContext.updateMetrics(row, tableId);
+                connectorMetricsCalcContext.updateMetrics(row, tableId);
             }
             sendRecordToNext(new Record<>(row));
             emptyThisPollNext = false;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index 8e06eab6c1..d95f7ef9eb 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -245,7 +245,7 @@ public abstract class SeaTunnelTask extends AbstractTask {
                             this,
                             completableFuture,
                             ((AbstractTaskGroupWithIntermediateQueue) 
taskBelongGroup)
-                                    .getQueueCache(config.getQueueID()));
+                                    .getQueueCache(config.getQueueID(), 
this.getMetricsContext()));
             outputs = flowLifeCycles;
         } else {
             throw new UnknownFlowException(flow);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 95996e3b8b..c039e97a76 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -39,7 +39,7 @@ import 
org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
 import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
 import org.apache.seatunnel.engine.server.event.JobEventListener;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
-import org.apache.seatunnel.engine.server.metrics.TaskMetricsCalcContext;
+import org.apache.seatunnel.engine.server.metrics.ConnectorMetricsCalcContext;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 import org.apache.seatunnel.engine.server.task.context.SinkWriterContext;
 import 
org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOperation;
@@ -91,15 +91,15 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
 
     private Optional<CommitInfoT> lastCommitInfo;
 
-    private MetricsContext metricsContext;
+    private final MetricsContext metricsContext;
 
-    private TaskMetricsCalcContext taskMetricsCalcContext;
+    private final ConnectorMetricsCalcContext connectorMetricsCalcContext;
 
     private final boolean containAggCommitter;
 
-    private EventListener eventListener;
+    private final EventListener eventListener;
 
-    /** Mapping relationship between upstream tablepath and downstream 
tablepath. */
+    /** Mapping relationship between upstream TablePath and downstream 
TablePath. */
     private final Map<TablePath, TablePath> tablesMaps = new HashMap<>();
 
     public SinkFlowLifeCycle(
@@ -139,8 +139,9 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
                 sinkTables.add(TablePath.DEFAULT);
             }
         }
-        this.taskMetricsCalcContext =
-                new TaskMetricsCalcContext(metricsContext, PluginType.SINK, 
isMulti, sinkTables);
+        this.connectorMetricsCalcContext =
+                new ConnectorMetricsCalcContext(
+                        metricsContext, PluginType.SINK, isMulti, sinkTables);
     }
 
     @Override
@@ -264,7 +265,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
                 if (prepareClose) {
                     return;
                 }
-                String tableId = "";
+                String tableId;
                 writer.write((T) record.getData());
                 if (record.getData() instanceof SeaTunnelRow) {
                     if (this.sinkAction.getSink() instanceof MultiTableSink) {
@@ -295,7 +296,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
                                         
.orElseGet(TablePath.DEFAULT::getFullName);
                     }
 
-                    taskMetricsCalcContext.updateMetrics(record.getData(), 
tableId);
+                    
connectorMetricsCalcContext.updateMetrics(record.getData(), tableId);
                 }
             }
         } catch (Exception e) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/AbstractTaskGroupWithIntermediateQueue.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/AbstractTaskGroupWithIntermediateQueue.java
index 137157b34a..2238c6d086 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/AbstractTaskGroupWithIntermediateQueue.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/AbstractTaskGroupWithIntermediateQueue.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.server.task.group;
 
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.engine.server.execution.Task;
 import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
@@ -30,5 +31,6 @@ public abstract class AbstractTaskGroupWithIntermediateQueue 
extends TaskGroupDe
         super(taskGroupLocation, taskGroupName, tasks);
     }
 
-    public abstract AbstractIntermediateQueue<?> getQueueCache(long id);
+    public abstract AbstractIntermediateQueue<?> getQueueCache(
+            long id, MetricsContext metricsContext);
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateBlockingQueue.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateBlockingQueue.java
index f1153364f6..2abed37bd4 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateBlockingQueue.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateBlockingQueue.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.engine.server.task.group;
 
+import org.apache.seatunnel.api.common.metrics.Counter;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.table.type.Record;
 import org.apache.seatunnel.engine.server.execution.Task;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
@@ -25,12 +27,16 @@ import 
org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 import 
org.apache.seatunnel.engine.server.task.group.queue.AbstractIntermediateQueue;
 import 
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue;
 
+import org.apache.commons.lang3.tuple.Pair;
+
 import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.INTERMEDIATE_QUEUE_SIZE;
+
 public class TaskGroupWithIntermediateBlockingQueue extends 
AbstractTaskGroupWithIntermediateQueue {
 
     public static final int QUEUE_SIZE = 2048;
@@ -40,7 +46,7 @@ public class TaskGroupWithIntermediateBlockingQueue extends 
AbstractTaskGroupWit
         super(taskGroupLocation, taskGroupName, tasks);
     }
 
-    private Map<Long, BlockingQueue<Record<?>>> blockingQueueCache = null;
+    private Map<Long, Pair<BlockingQueue<Record<?>>, Counter>> 
blockingQueueCache = null;
 
     @Override
     public void init() {
@@ -52,9 +58,15 @@ public class TaskGroupWithIntermediateBlockingQueue extends 
AbstractTaskGroupWit
     }
 
     @Override
-    public AbstractIntermediateQueue<?> getQueueCache(long id) {
-        blockingQueueCache.computeIfAbsent(id, i -> new 
ArrayBlockingQueue<>(QUEUE_SIZE));
-        return new IntermediateBlockingQueue(blockingQueueCache.get(id));
+    public AbstractIntermediateQueue<?> getQueueCache(long id, MetricsContext 
metricsContext) {
+        blockingQueueCache.computeIfAbsent(
+                id,
+                i ->
+                        Pair.of(
+                                new ArrayBlockingQueue<>(QUEUE_SIZE),
+                                
metricsContext.counter(INTERMEDIATE_QUEUE_SIZE)));
+        Pair<BlockingQueue<Record<?>>, Counter> cache = 
blockingQueueCache.get(id);
+        return new IntermediateBlockingQueue(cache.getLeft(), 
cache.getRight());
     }
 
     @Override
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateDisruptor.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateDisruptor.java
index aabb53ae3f..4f768d955e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateDisruptor.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateDisruptor.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.server.task.group;
 
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.engine.server.execution.Task;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.execution.TaskGroupType;
@@ -57,7 +58,7 @@ public class TaskGroupWithIntermediateDisruptor extends 
AbstractTaskGroupWithInt
     }
 
     @Override
-    public AbstractIntermediateQueue<?> getQueueCache(long id) {
+    public AbstractIntermediateQueue<?> getQueueCache(long id, MetricsContext 
metricsContext) {
         EventFactory<RecordEvent> eventFactory = new RecordEventFactory();
         Disruptor<RecordEvent> disruptor =
                 new Disruptor<>(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java
index b8e53faabd..08afa086f6 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.server.task.group.queue;
 
+import org.apache.seatunnel.api.common.metrics.Counter;
 import org.apache.seatunnel.api.table.type.Record;
 import org.apache.seatunnel.api.transform.Collector;
 import org.apache.seatunnel.common.utils.function.ConsumerWithException;
@@ -29,14 +30,19 @@ import java.util.concurrent.TimeUnit;
 
 public class IntermediateBlockingQueue extends 
AbstractIntermediateQueue<BlockingQueue<Record<?>>> {
 
-    public IntermediateBlockingQueue(BlockingQueue<Record<?>> queue) {
+    private final Counter intermediateQueueSize;
+
+    public IntermediateBlockingQueue(
+            BlockingQueue<Record<?>> queue, Counter intermediateQueueSize) {
         super(queue);
+        this.intermediateQueueSize = intermediateQueueSize;
     }
 
     @Override
     public void received(Record<?> record) {
         try {
             handleRecord(record, getIntermediateQueue()::put);
+            intermediateQueueSize.inc();
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -48,6 +54,7 @@ public class IntermediateBlockingQueue extends 
AbstractIntermediateQueue<Blockin
             Record<?> record = getIntermediateQueue().poll(100, 
TimeUnit.MILLISECONDS);
             if (record != null) {
                 handleRecord(record, collector::collect);
+                intermediateQueueSize.dec();
             } else {
                 break;
             }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
index 6131ca88be..8fcd93eb0c 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
@@ -32,6 +32,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.util.concurrent.TimeUnit;
 
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.INTERMEDIATE_QUEUE_SIZE;
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_QPS;
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_COUNT;
@@ -89,6 +90,22 @@ class JobMetricsTest extends AbstractSeaTunnelServerTest {
         assertEquals(30, (Long) 
jobMetrics.get(SOURCE_RECEIVED_COUNT).get(0).value());
         assertTrue((Double) jobMetrics.get(SOURCE_RECEIVED_QPS).get(0).value() 
> 0);
         assertTrue((Double) jobMetrics.get(SINK_WRITE_QPS).get(0).value() > 0);
+        assertEquals(0, (Long) 
jobMetrics.get(INTERMEDIATE_QUEUE_SIZE).get(0).value());
+    }
+
+    @Test
+    public void testMetricsWhenJobFailed() {
+        long jobId = System.currentTimeMillis();
+        startJob(jobId, "stream_fake_to_inmemory_with_error.conf", false);
+        await().atMost(120000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertEquals(
+                                        JobStatus.FAILED,
+                                        
server.getCoordinatorService().getJobStatus(jobId)));
+
+        JobMetrics jobMetrics = 
server.getCoordinatorService().getJobMetrics(jobId);
+        assertTrue((Long) 
jobMetrics.get(INTERMEDIATE_QUEUE_SIZE).get(0).value() > 0);
     }
 
     @Test

Reply via email to