This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 94bb6350df [Feature][Zeta] Add metrics for task intermediate queue
size (#9550)
94bb6350df is described below
commit 94bb6350df4ced5661268795aa8c43804983e196
Author: Jia Fan <[email protected]>
AuthorDate: Tue Jul 22 11:27:51 2025 +0800
[Feature][Zeta] Add metrics for task intermediate queue size (#9550)
---
docs/en/seatunnel-engine/rest-api-v2.md | 2 ++
docs/zh/seatunnel-engine/rest-api-v2.md | 1 +
.../seatunnel/api/common/metrics/MetricNames.java | 2 ++
.../org/apache/seatunnel/engine/e2e/RestApiIT.java | 1 +
...Context.java => ConnectorMetricsCalcContext.java} | 12 ++++++------
.../engine/server/rest/service/BaseService.java | 7 ++++++-
.../engine/server/task/SeaTunnelSourceCollector.java | 15 +++++++--------
.../seatunnel/engine/server/task/SeaTunnelTask.java | 2 +-
.../engine/server/task/flow/SinkFlowLifeCycle.java | 19 ++++++++++---------
.../AbstractTaskGroupWithIntermediateQueue.java | 4 +++-
.../TaskGroupWithIntermediateBlockingQueue.java | 20 ++++++++++++++++----
.../group/TaskGroupWithIntermediateDisruptor.java | 3 ++-
.../task/group/queue/IntermediateBlockingQueue.java | 9 ++++++++-
.../engine/server/master/JobMetricsTest.java | 17 +++++++++++++++++
14 files changed, 82 insertions(+), 32 deletions(-)
diff --git a/docs/en/seatunnel-engine/rest-api-v2.md
b/docs/en/seatunnel-engine/rest-api-v2.md
index e5b85de941..3ea6242d57 100644
--- a/docs/en/seatunnel-engine/rest-api-v2.md
+++ b/docs/en/seatunnel-engine/rest-api-v2.md
@@ -157,6 +157,7 @@ Please refer [security](security.md)
"pipelineEdges": {}
},
"metrics": {
+ "IntermediateQueueSize": "",
"SourceReceivedCount": "",
"SourceReceivedQPS": "",
"SourceReceivedBytes": "",
@@ -237,6 +238,7 @@ This API has been deprecated, please use /job-info/:jobId
instead
"pipelineEdges": {}
},
"metrics": {
+ "IntermediateQueueSize": "",
"SourceReceivedCount": "",
"SourceReceivedQPS": "",
"SourceReceivedBytes": "",
diff --git a/docs/zh/seatunnel-engine/rest-api-v2.md
b/docs/zh/seatunnel-engine/rest-api-v2.md
index b2951b5e13..79b02c92e4 100644
--- a/docs/zh/seatunnel-engine/rest-api-v2.md
+++ b/docs/zh/seatunnel-engine/rest-api-v2.md
@@ -153,6 +153,7 @@ seatunnel:
"pipelineEdges": {}
},
"metrics": {
+ "IntermediateQueueSize": "",
"SourceReceivedCount": "",
"SourceReceivedQPS": "",
"SourceReceivedBytes": "",
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java
index b1fc60e0f1..46c71047b9 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java
@@ -33,4 +33,6 @@ public final class MetricNames {
public static final String SINK_WRITE_BYTES = "SinkWriteBytes";
public static final String SINK_WRITE_QPS = "SinkWriteQPS";
public static final String SINK_WRITE_BYTES_PER_SECONDS =
"SinkWriteBytesPerSeconds";
+
+ public static final String INTERMEDIATE_QUEUE_SIZE =
"IntermediateQueueSize";
}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index 83b44130e1..b54911cd86 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -641,6 +641,7 @@ public class RestApiIT {
equalTo("5"))
.body("metrics.SinkWriteCount", equalTo("5"))
.body("metrics.SourceReceivedCount", equalTo("5"))
+
.body("metrics.IntermediateQueueSize", equalTo("0"))
.body(
"jobDag.envOptions.'job.mode'",
equalTo("BATCH"))
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/TaskMetricsCalcContext.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/ConnectorMetricsCalcContext.java
similarity index 94%
rename from
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/TaskMetricsCalcContext.java
rename to
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/ConnectorMetricsCalcContext.java
index 6890421f9f..ddf4cf18a0 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/TaskMetricsCalcContext.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/ConnectorMetricsCalcContext.java
@@ -39,7 +39,7 @@ import static
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVE
import static
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_COUNT;
import static
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_QPS;
-public class TaskMetricsCalcContext {
+public class ConnectorMetricsCalcContext {
private final MetricsContext metricsContext;
@@ -47,21 +47,21 @@ public class TaskMetricsCalcContext {
private Counter count;
- private Map<String, Counter> countPerTable = new ConcurrentHashMap<>();
+ private final Map<String, Counter> countPerTable = new
ConcurrentHashMap<>();
private Meter QPS;
- private Map<String, Meter> QPSPerTable = new ConcurrentHashMap<>();
+ private final Map<String, Meter> QPSPerTable = new ConcurrentHashMap<>();
private Counter bytes;
- private Map<String, Counter> bytesPerTable = new ConcurrentHashMap<>();
+ private final Map<String, Counter> bytesPerTable = new
ConcurrentHashMap<>();
private Meter bytesPerSeconds;
- private Map<String, Meter> bytesPerSecondsPerTable = new
ConcurrentHashMap<>();
+ private final Map<String, Meter> bytesPerSecondsPerTable = new
ConcurrentHashMap<>();
- public TaskMetricsCalcContext(
+ public ConnectorMetricsCalcContext(
MetricsContext metricsContext,
PluginType type,
boolean isMulti,
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
index 5e81fc79a5..05e55a9378 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
@@ -78,6 +78,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.INTERMEDIATE_QUEUE_SIZE;
import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES;
import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES_PER_SECONDS;
import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
@@ -245,7 +246,11 @@ public abstract class BaseService {
Map<String, Object> metricsMap = new HashMap<>();
// To add metrics, populate the corresponding array,
String[] countMetricsNames = {
- SOURCE_RECEIVED_COUNT, SINK_WRITE_COUNT, SOURCE_RECEIVED_BYTES,
SINK_WRITE_BYTES
+ SOURCE_RECEIVED_COUNT,
+ SINK_WRITE_COUNT,
+ SOURCE_RECEIVED_BYTES,
+ SINK_WRITE_BYTES,
+ INTERMEDIATE_QUEUE_SIZE
};
String[] rateMetricsNames = {
SOURCE_RECEIVED_QPS,
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
index b3cafb9d68..04ccbda35d 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
@@ -32,7 +32,7 @@ import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.core.starter.flowcontrol.FlowControlGate;
import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
-import org.apache.seatunnel.engine.server.metrics.TaskMetricsCalcContext;
+import org.apache.seatunnel.engine.server.metrics.ConnectorMetricsCalcContext;
import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle;
import org.apache.commons.collections4.CollectionUtils;
@@ -52,9 +52,7 @@ public class SeaTunnelSourceCollector<T> implements
Collector<T> {
private final List<OneInputFlowLifeCycle<Record<?>>> outputs;
- private final MetricsContext metricsContext;
-
- private final TaskMetricsCalcContext taskMetricsCalcContext;
+ private final ConnectorMetricsCalcContext connectorMetricsCalcContext;
private final AtomicBoolean schemaChangeBeforeCheckpointSignal = new
AtomicBoolean(false);
@@ -77,14 +75,13 @@ public class SeaTunnelSourceCollector<T> implements
Collector<T> {
this.checkpointLock = checkpointLock;
this.outputs = outputs;
this.rowType = rowType;
- this.metricsContext = metricsContext;
if (rowType instanceof MultipleRowType) {
((MultipleRowType) rowType)
.iterator()
.forEachRemaining(type ->
this.rowTypeMap.put(type.getKey(), type.getValue()));
}
- this.taskMetricsCalcContext =
- new TaskMetricsCalcContext(
+ this.connectorMetricsCalcContext =
+ new ConnectorMetricsCalcContext(
metricsContext,
PluginType.SOURCE,
CollectionUtils.isNotEmpty(tablePaths),
@@ -97,6 +94,8 @@ public class SeaTunnelSourceCollector<T> implements
Collector<T> {
try {
if (row instanceof SeaTunnelRow) {
String tableId = ((SeaTunnelRow) row).getTableId();
+ // init the size of row early with rowType, this way is faster
than init the size
+ // without rowType
int size;
if (rowType instanceof SeaTunnelRowType) {
size = ((SeaTunnelRow)
row).getBytesSize((SeaTunnelRowType) rowType);
@@ -107,7 +106,7 @@ public class SeaTunnelSourceCollector<T> implements
Collector<T> {
"Unsupported row type: " +
rowType.getClass().getName());
}
flowControlGate.audit((SeaTunnelRow) row);
- taskMetricsCalcContext.updateMetrics(row, tableId);
+ connectorMetricsCalcContext.updateMetrics(row, tableId);
}
sendRecordToNext(new Record<>(row));
emptyThisPollNext = false;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index 8e06eab6c1..d95f7ef9eb 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -245,7 +245,7 @@ public abstract class SeaTunnelTask extends AbstractTask {
this,
completableFuture,
((AbstractTaskGroupWithIntermediateQueue)
taskBelongGroup)
- .getQueueCache(config.getQueueID()));
+ .getQueueCache(config.getQueueID(),
this.getMetricsContext()));
outputs = flowLifeCycles;
} else {
throw new UnknownFlowException(flow);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 95996e3b8b..c039e97a76 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -39,7 +39,7 @@ import
org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
import org.apache.seatunnel.engine.server.event.JobEventListener;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
-import org.apache.seatunnel.engine.server.metrics.TaskMetricsCalcContext;
+import org.apache.seatunnel.engine.server.metrics.ConnectorMetricsCalcContext;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.context.SinkWriterContext;
import
org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOperation;
@@ -91,15 +91,15 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
private Optional<CommitInfoT> lastCommitInfo;
- private MetricsContext metricsContext;
+ private final MetricsContext metricsContext;
- private TaskMetricsCalcContext taskMetricsCalcContext;
+ private final ConnectorMetricsCalcContext connectorMetricsCalcContext;
private final boolean containAggCommitter;
- private EventListener eventListener;
+ private final EventListener eventListener;
- /** Mapping relationship between upstream tablepath and downstream
tablepath. */
+ /** Mapping relationship between upstream TablePath and downstream
TablePath. */
private final Map<TablePath, TablePath> tablesMaps = new HashMap<>();
public SinkFlowLifeCycle(
@@ -139,8 +139,9 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
sinkTables.add(TablePath.DEFAULT);
}
}
- this.taskMetricsCalcContext =
- new TaskMetricsCalcContext(metricsContext, PluginType.SINK,
isMulti, sinkTables);
+ this.connectorMetricsCalcContext =
+ new ConnectorMetricsCalcContext(
+ metricsContext, PluginType.SINK, isMulti, sinkTables);
}
@Override
@@ -264,7 +265,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
if (prepareClose) {
return;
}
- String tableId = "";
+ String tableId;
writer.write((T) record.getData());
if (record.getData() instanceof SeaTunnelRow) {
if (this.sinkAction.getSink() instanceof MultiTableSink) {
@@ -295,7 +296,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
.orElseGet(TablePath.DEFAULT::getFullName);
}
- taskMetricsCalcContext.updateMetrics(record.getData(),
tableId);
+
connectorMetricsCalcContext.updateMetrics(record.getData(), tableId);
}
}
} catch (Exception e) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/AbstractTaskGroupWithIntermediateQueue.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/AbstractTaskGroupWithIntermediateQueue.java
index 137157b34a..2238c6d086 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/AbstractTaskGroupWithIntermediateQueue.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/AbstractTaskGroupWithIntermediateQueue.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.server.task.group;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.engine.server.execution.Task;
import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
@@ -30,5 +31,6 @@ public abstract class AbstractTaskGroupWithIntermediateQueue
extends TaskGroupDe
super(taskGroupLocation, taskGroupName, tasks);
}
- public abstract AbstractIntermediateQueue<?> getQueueCache(long id);
+ public abstract AbstractIntermediateQueue<?> getQueueCache(
+ long id, MetricsContext metricsContext);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateBlockingQueue.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateBlockingQueue.java
index f1153364f6..2abed37bd4 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateBlockingQueue.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateBlockingQueue.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.engine.server.task.group;
+import org.apache.seatunnel.api.common.metrics.Counter;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.engine.server.execution.Task;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
@@ -25,12 +27,16 @@ import
org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import
org.apache.seatunnel.engine.server.task.group.queue.AbstractIntermediateQueue;
import
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue;
+import org.apache.commons.lang3.tuple.Pair;
+
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.INTERMEDIATE_QUEUE_SIZE;
+
public class TaskGroupWithIntermediateBlockingQueue extends
AbstractTaskGroupWithIntermediateQueue {
public static final int QUEUE_SIZE = 2048;
@@ -40,7 +46,7 @@ public class TaskGroupWithIntermediateBlockingQueue extends
AbstractTaskGroupWit
super(taskGroupLocation, taskGroupName, tasks);
}
- private Map<Long, BlockingQueue<Record<?>>> blockingQueueCache = null;
+ private Map<Long, Pair<BlockingQueue<Record<?>>, Counter>>
blockingQueueCache = null;
@Override
public void init() {
@@ -52,9 +58,15 @@ public class TaskGroupWithIntermediateBlockingQueue extends
AbstractTaskGroupWit
}
@Override
- public AbstractIntermediateQueue<?> getQueueCache(long id) {
- blockingQueueCache.computeIfAbsent(id, i -> new
ArrayBlockingQueue<>(QUEUE_SIZE));
- return new IntermediateBlockingQueue(blockingQueueCache.get(id));
+ public AbstractIntermediateQueue<?> getQueueCache(long id, MetricsContext
metricsContext) {
+ blockingQueueCache.computeIfAbsent(
+ id,
+ i ->
+ Pair.of(
+ new ArrayBlockingQueue<>(QUEUE_SIZE),
+
metricsContext.counter(INTERMEDIATE_QUEUE_SIZE)));
+ Pair<BlockingQueue<Record<?>>, Counter> cache =
blockingQueueCache.get(id);
+ return new IntermediateBlockingQueue(cache.getLeft(),
cache.getRight());
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateDisruptor.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateDisruptor.java
index aabb53ae3f..4f768d955e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateDisruptor.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateDisruptor.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.server.task.group;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.engine.server.execution.Task;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.execution.TaskGroupType;
@@ -57,7 +58,7 @@ public class TaskGroupWithIntermediateDisruptor extends
AbstractTaskGroupWithInt
}
@Override
- public AbstractIntermediateQueue<?> getQueueCache(long id) {
+ public AbstractIntermediateQueue<?> getQueueCache(long id, MetricsContext
metricsContext) {
EventFactory<RecordEvent> eventFactory = new RecordEventFactory();
Disruptor<RecordEvent> disruptor =
new Disruptor<>(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java
index b8e53faabd..08afa086f6 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.server.task.group.queue;
+import org.apache.seatunnel.api.common.metrics.Counter;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.transform.Collector;
import org.apache.seatunnel.common.utils.function.ConsumerWithException;
@@ -29,14 +30,19 @@ import java.util.concurrent.TimeUnit;
public class IntermediateBlockingQueue extends
AbstractIntermediateQueue<BlockingQueue<Record<?>>> {
- public IntermediateBlockingQueue(BlockingQueue<Record<?>> queue) {
+ private final Counter intermediateQueueSize;
+
+ public IntermediateBlockingQueue(
+ BlockingQueue<Record<?>> queue, Counter intermediateQueueSize) {
super(queue);
+ this.intermediateQueueSize = intermediateQueueSize;
}
@Override
public void received(Record<?> record) {
try {
handleRecord(record, getIntermediateQueue()::put);
+ intermediateQueueSize.inc();
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -48,6 +54,7 @@ public class IntermediateBlockingQueue extends
AbstractIntermediateQueue<Blockin
Record<?> record = getIntermediateQueue().poll(100,
TimeUnit.MILLISECONDS);
if (record != null) {
handleRecord(record, collector::collect);
+ intermediateQueueSize.dec();
} else {
break;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
index 6131ca88be..8fcd93eb0c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
@@ -32,6 +32,7 @@ import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.INTERMEDIATE_QUEUE_SIZE;
import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_QPS;
import static
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_COUNT;
@@ -89,6 +90,22 @@ class JobMetricsTest extends AbstractSeaTunnelServerTest {
assertEquals(30, (Long)
jobMetrics.get(SOURCE_RECEIVED_COUNT).get(0).value());
assertTrue((Double) jobMetrics.get(SOURCE_RECEIVED_QPS).get(0).value()
> 0);
assertTrue((Double) jobMetrics.get(SINK_WRITE_QPS).get(0).value() > 0);
+ assertEquals(0, (Long)
jobMetrics.get(INTERMEDIATE_QUEUE_SIZE).get(0).value());
+ }
+
+ @Test
+ public void testMetricsWhenJobFailed() {
+ long jobId = System.currentTimeMillis();
+ startJob(jobId, "stream_fake_to_inmemory_with_error.conf", false);
+ await().atMost(120000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ JobStatus.FAILED,
+
server.getCoordinatorService().getJobStatus(jobId)));
+
+ JobMetrics jobMetrics =
server.getCoordinatorService().getJobMetrics(jobId);
+ assertTrue((Long)
jobMetrics.get(INTERMEDIATE_QUEUE_SIZE).get(0).value() > 0);
}
@Test