This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 28340ca90 [INLONG-6022][Agent] Fix lost read and send count (#6023)
28340ca90 is described below

commit 28340ca905107f91777a01e17f3285d325bc3cda
Author: Lucas <100204617+lucaspeng12...@users.noreply.github.com>
AuthorDate: Tue Sep 27 20:08:53 2022 +0800

    [INLONG-6022][Agent] Fix lost read and send count (#6023)
---
 .../inlong/agent/metrics/AgentMetricItem.java      |  7 ++-
 .../inlong/agent/plugin/channel/MemoryChannel.java | 70 ++++++++++++++--------
 .../inlong/agent/plugin/sinks/SenderManager.java   |  6 ++
 .../agent/plugin/sources/reader/BinlogReader.java  |  3 +-
 .../agent/plugin/sources/reader/KafkaReader.java   |  1 +
 .../agent/plugin/sources/reader/MongoDBReader.java |  4 +-
 .../plugin/sources/reader/PostgreSQLReader.java    |  3 +-
 .../plugin/sources/reader/SQLServerReader.java     |  2 +
 .../agent/plugin/sources/reader/SqlReader.java     |  2 +
 .../sources/reader/file/FileReaderOperator.java    |  1 +
 .../agent/plugin/sources/TestSQLServerReader.java  |  4 ++
 11 files changed, 70 insertions(+), 33 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentMetricItem.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentMetricItem.java
index dbec354ff..96d35f97c 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentMetricItem.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentMetricItem.java
@@ -19,6 +19,7 @@ package org.apache.inlong.agent.metrics;
 
 import org.apache.inlong.common.metric.CountMetric;
 import org.apache.inlong.common.metric.Dimension;
+import org.apache.inlong.common.metric.GaugeMetric;
 import org.apache.inlong.common.metric.MetricDomain;
 import org.apache.inlong.common.metric.MetricItem;
 
@@ -61,13 +62,13 @@ public class AgentMetricItem extends MetricItem {
     @Dimension
     public String inlongStreamId;
 
-    @CountMetric
+    @GaugeMetric
     public AtomicLong jobRunningCount = new AtomicLong(0);
     @CountMetric
     public AtomicLong jobFatalCount = new AtomicLong(0);
-    @CountMetric
+    @GaugeMetric
     public AtomicLong taskRunningCount = new AtomicLong(0);
-    @CountMetric
+    @GaugeMetric
     public AtomicLong taskRetryingCount = new AtomicLong(0);
     @CountMetric
     public AtomicLong taskFatalCount = new AtomicLong(0);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/channel/MemoryChannel.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/channel/MemoryChannel.java
index 9125b5852..086d45b8b 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/channel/MemoryChannel.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/channel/MemoryChannel.java
@@ -19,7 +19,6 @@ package org.apache.inlong.agent.plugin.channel;
 
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.constant.AgentConstants;
-import org.apache.inlong.agent.message.ProxyMessage;
 import org.apache.inlong.agent.metrics.AgentMetricItem;
 import org.apache.inlong.agent.metrics.AgentMetricItemSet;
 import org.apache.inlong.agent.plugin.Channel;
@@ -35,7 +34,11 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
+import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
+import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
 import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
 import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
 
 /**
@@ -49,38 +52,31 @@ public class MemoryChannel implements Channel {
     //metric
     private AgentMetricItemSet metricItemSet;
     private static final AtomicLong METRIC_INDEX = new AtomicLong(0);
+    private String inlongGroupId;
+    private String inlongStreamId;
 
     public MemoryChannel() {
     }
 
     @Override
     public void push(Message message) {
-        String groupId = DEFAULT_PROXY_INLONG_GROUP_ID;
         try {
             if (message != null) {
-                if (message instanceof ProxyMessage) {
-                    groupId = ((ProxyMessage) message).getInlongGroupId();
-                }
-                AgentMetricItem metricItem = 
getMetricItem(KEY_INLONG_GROUP_ID, groupId);
+                AgentMetricItem metricItem = getMetricItem(new HashMap<String, 
String>());
                 metricItem.pluginReadCount.incrementAndGet();
                 queue.put(message);
                 metricItem.pluginReadSuccessCount.incrementAndGet();
             }
         } catch (InterruptedException ex) {
-            getMetricItem(KEY_INLONG_GROUP_ID, 
groupId).pluginReadFailCount.incrementAndGet();
-            Thread.currentThread().interrupt();
+            this.metricItemReadFailed();
         }
     }
 
     @Override
     public boolean push(Message message, long timeout, TimeUnit unit) {
-        String groupId = DEFAULT_PROXY_INLONG_GROUP_ID;
         try {
             if (message != null) {
-                if (message instanceof ProxyMessage) {
-                    groupId = ((ProxyMessage) message).getInlongGroupId();
-                }
-                AgentMetricItem metricItem = 
getMetricItem(KEY_INLONG_GROUP_ID, groupId);
+                AgentMetricItem metricItem = getMetricItem(new HashMap<String, 
String>());
                 metricItem.pluginReadCount.incrementAndGet();
                 boolean result = queue.offer(message, timeout, unit);
                 if (result) {
@@ -91,36 +87,31 @@ public class MemoryChannel implements Channel {
                 return result;
             }
         } catch (InterruptedException ex) {
-            AgentMetricItem metricItem = getMetricItem(KEY_INLONG_GROUP_ID, 
groupId);
-            metricItem.pluginReadFailCount.incrementAndGet();
-            Thread.currentThread().interrupt();
+            this.metricItemReadFailed();
         }
         return false;
     }
 
     @Override
     public Message pull(long timeout, TimeUnit unit) {
-        String groupId = DEFAULT_PROXY_INLONG_GROUP_ID;
         try {
             Message message = queue.poll(timeout, unit);
             if (message != null) {
-                if (message instanceof ProxyMessage) {
-                    groupId = ((ProxyMessage) message).getInlongGroupId();
-                }
-                AgentMetricItem metricItem = 
getMetricItem(KEY_INLONG_GROUP_ID, groupId);
+                AgentMetricItem metricItem = getMetricItem(new HashMap<String, 
String>());
                 metricItem.pluginSendSuccessCount.incrementAndGet();
+                metricItem.pluginSendCount.incrementAndGet();
             }
             return message;
         } catch (InterruptedException ex) {
-            AgentMetricItem metricItem = getMetricItem(KEY_INLONG_GROUP_ID, 
groupId);
-            metricItem.pluginSendFailCount.incrementAndGet();
-            Thread.currentThread().interrupt();
+            this.metricItemSendFailed();
             throw new IllegalStateException(ex);
         }
     }
 
     @Override
     public void init(JobProfile jobConf) {
+        inlongGroupId = jobConf.get(PROXY_INLONG_GROUP_ID, 
DEFAULT_PROXY_INLONG_GROUP_ID);
+        inlongStreamId = jobConf.get(PROXY_INLONG_STREAM_ID, 
DEFAULT_PROXY_INLONG_STREAM_ID);
         queue = new LinkedBlockingQueue<>(
                 jobConf.getInt(AgentConstants.CHANNEL_MEMORY_CAPACITY,
                         AgentConstants.DEFAULT_CHANNEL_MEMORY_CAPACITY));
@@ -138,10 +129,37 @@ public class MemoryChannel implements Channel {
         LOGGER.info("destroy channel, show memory channel metric:");
     }
 
-    private AgentMetricItem getMetricItem(String otherKey, String value) {
+    private AgentMetricItem getMetricItem(Map<String, String> dimens) {
         Map<String, String> dimensions = new HashMap<>();
         dimensions.put(KEY_PLUGIN_ID, this.getClass().getSimpleName());
-        dimensions.put(otherKey, value);
+        dimensions.put(KEY_INLONG_GROUP_ID, inlongGroupId);
+        dimensions.put(KEY_INLONG_STREAM_ID, inlongStreamId);
+        dimens.forEach((key, value) -> {
+            dimensions.put(key, value);
+        });
         return this.metricItemSet.findMetricItem(dimensions);
     }
+
+    private void metricItemReadFailed() {
+        Map<String, String> dimensions = new HashMap<String, String>();
+        dimensions.put(KEY_INLONG_GROUP_ID, inlongGroupId);
+        dimensions.put(KEY_INLONG_STREAM_ID, inlongStreamId);
+        AgentMetricItem metricItem = getMetricItem(dimensions);
+        metricItem.pluginReadFailCount.incrementAndGet();
+        LOGGER.debug("plugin read failed:{}", dimensions.toString());
+        Thread.currentThread().interrupt();
+        return;
+    }
+
+    private void metricItemSendFailed() {
+        Map<String, String> dimensions = new HashMap<String, String>();
+        dimensions.put(KEY_INLONG_GROUP_ID, inlongGroupId);
+        dimensions.put(KEY_INLONG_STREAM_ID, inlongStreamId);
+        AgentMetricItem metricItem = getMetricItem(dimensions);
+        metricItem.pluginSendFailCount.incrementAndGet();
+        metricItem.pluginSendCount.incrementAndGet();
+        LOGGER.debug("plugin send failed:{}", dimensions.toString());
+        Thread.currentThread().interrupt();
+        return;
+    }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
index dd164847d..9cc10d604 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
@@ -256,6 +256,7 @@ public class SenderManager {
             AgentUtils.silenceSleepInMs(retrySleepTime);
         }
         Map<String, String> dims = new HashMap<>();
+        dims.put(KEY_PLUGIN_ID, this.getClass().getSimpleName());
         dims.put(KEY_INLONG_GROUP_ID, groupId);
         dims.put(KEY_INLONG_STREAM_ID, streamId);
         try {
@@ -264,11 +265,13 @@ public class SenderManager {
             if (result == SendResult.OK) {
                 semaphore.release(bodyList.size());
                 
getMetricItem(dims).pluginSendSuccessCount.addAndGet(bodyList.size());
+                getMetricItem(dims).pluginSendCount.addAndGet(bodyList.size());
                 long totalSize = bodyList.stream().mapToLong(body -> 
body.length).sum();
                 AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, 
groupId, streamId, dataTime, bodyList.size(),
                         totalSize);
             } else {
                 
getMetricItem(dims).pluginSendFailCount.addAndGet(bodyList.size());
+                getMetricItem(dims).pluginSendCount.addAndGet(bodyList.size());
                 LOGGER.warn("send data to dataproxy error {}", 
result.toString());
                 sendBatchSync(groupId, streamId, bodyList, retry + 1, 
dataTime, extraMap);
             }
@@ -279,6 +282,7 @@ public class SenderManager {
             try {
                 TimeUnit.SECONDS.sleep(1);
                 
getMetricItem(dims).pluginSendFailCount.addAndGet(bodyList.size());
+                getMetricItem(dims).pluginSendCount.addAndGet(bodyList.size());
                 sendBatchSync(groupId, streamId, bodyList, retry + 1, 
dataTime, extraMap);
             } catch (Exception ignored) {
                 // ignore it.
@@ -319,12 +323,14 @@ public class SenderManager {
             }
             semaphore.release(bodyList.size());
             Map<String, String> dims = new HashMap<>();
+            dims.put(KEY_PLUGIN_ID, this.getClass().getSimpleName());
             dims.put(KEY_INLONG_GROUP_ID, groupId);
             dims.put(KEY_INLONG_STREAM_ID, streamId);
             long totalSize = bodyList.stream().mapToLong(body -> 
body.length).sum();
             AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, 
streamId, dataTime, bodyList.size(),
                     totalSize);
             
getMetricItem(dims).pluginSendSuccessCount.addAndGet(bodyList.size());
+            getMetricItem(dims).pluginSendCount.addAndGet(bodyList.size());
             if (sourcePath != null) {
                 taskPositionManager.updateSinkPosition(jobId, sourcePath, 
bodyList.size());
             }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
index 3fab3899c..38877951b 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
@@ -111,7 +111,6 @@ public class BinlogReader extends AbstractReader {
     @Override
     public Message read() {
         if (!binlogMessagesQueue.isEmpty()) {
-            readerMetric.pluginReadCount.incrementAndGet();
             return getBinlogMessage();
         } else {
             return null;
@@ -170,9 +169,11 @@ public class BinlogReader extends AbstractReader {
                         long dataSize = records.stream().mapToLong(r -> 
r.value().length()).sum();
                         AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, 
inlongGroupId, inlongStreamId,
                                 System.currentTimeMillis(), records.size(), 
dataSize);
+                        
readerMetric.pluginReadSuccessCount.addAndGet(records.size());
                         readerMetric.pluginReadCount.addAndGet(records.size());
                     } catch (Exception e) {
                         
readerMetric.pluginReadFailCount.addAndGet(records.size());
+                        readerMetric.pluginReadCount.addAndGet(records.size());
                         LOGGER.error("parse binlog message error", e);
                     }
                 })
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
index 79e5479ee..52e72ed10 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/KafkaReader.java
@@ -121,6 +121,7 @@ public class KafkaReader<K, V> extends AbstractReader {
                         "partition:" + record.partition()
                                 + ", value:" + new String(recordValue) + ", 
offset:" + record.offset());
                 // control speed
+                readerMetric.pluginReadSuccessCount.incrementAndGet();
                 readerMetric.pluginReadCount.incrementAndGet();
                 // commit succeed,then record current offset
                 snapshot = record.partition() + 
JOB_KAFKA_PARTITION_OFFSET_DELIMITER + record.offset();
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java
index c7f0341f5..3abe037bb 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java
@@ -136,7 +136,6 @@ public class MongoDBReader extends AbstractReader {
     @Override
     public Message read() {
         if (!bufferPool.isEmpty()) {
-            super.readerMetric.pluginReadCount.incrementAndGet();
             return this.pollMessage();
         } else {
             return null;
@@ -367,12 +366,13 @@ public class MongoDBReader extends AbstractReader {
             long dataSize = records.stream().mapToLong(c -> 
c.value().length()).sum();
             AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, 
super.inlongGroupId, super.inlongStreamId,
                     System.currentTimeMillis(), records.size(), dataSize);
+            readerMetric.pluginReadSuccessCount.addAndGet(records.size());
             readerMetric.pluginReadCount.addAndGet(records.size());
         } catch (InterruptedException e) {
             e.printStackTrace();
             LOGGER.error("parse mongo message error", e);
-
             readerMetric.pluginReadFailCount.addAndGet(records.size());
+            readerMetric.pluginReadCount.addAndGet(records.size());
         }
     }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java
index dc7587bc7..23bfa1fbe 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java
@@ -108,7 +108,6 @@ public class PostgreSQLReader extends AbstractReader {
     @Override
     public Message read() {
         if (!postgreSQLMessageQueue.isEmpty()) {
-            readerMetric.pluginReadCount.incrementAndGet();
             return getPostgreSQLMessage();
         } else {
             return null;
@@ -165,9 +164,11 @@ public class PostgreSQLReader extends AbstractReader {
                         long dataSize = records.stream().mapToLong(c -> 
c.value().length()).sum();
                         AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, 
inlongGroupId, inlongStreamId,
                                 System.currentTimeMillis(), records.size(), 
dataSize);
+                        
readerMetric.pluginReadSuccessCount.addAndGet(records.size());
                         readerMetric.pluginReadCount.addAndGet(records.size());
                     } catch (Exception e) {
                         
readerMetric.pluginReadFailCount.addAndGet(records.size());
+                        readerMetric.pluginReadCount.addAndGet(records.size());
                         LOGGER.error("parse binlog message error", e);
                     }
                 })
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
index 9edb14ba3..c4daa8560 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java
@@ -112,11 +112,13 @@ public class SQLServerReader extends AbstractReader {
             long dataSize = lineColumns.stream().mapToLong(column -> 
column.length()).sum();
             AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, 
inlongGroupId, inlongStreamId,
                     System.currentTimeMillis(), 1, dataSize);
+            readerMetric.pluginReadSuccessCount.incrementAndGet();
             readerMetric.pluginReadCount.incrementAndGet();
             return generateMessage(lineColumns);
         } catch (Exception ex) {
             LOGGER.error("error while reading data", ex);
             readerMetric.pluginReadFailCount.incrementAndGet();
+            readerMetric.pluginReadCount.incrementAndGet();
             throw new RuntimeException(ex);
         }
     }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
index b1b0c2280..130326462 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SqlReader.java
@@ -118,6 +118,7 @@ public class SqlReader extends AbstractReader {
                 long dataSize = lineColumns.stream().mapToLong(column -> 
column.length()).sum();
                 AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS,
                         inlongGroupId, inlongStreamId, 
System.currentTimeMillis(), 1, dataSize);
+                readerMetric.pluginReadSuccessCount.incrementAndGet();
                 readerMetric.pluginReadCount.incrementAndGet();
                 return generateMessage(lineColumns);
             } else {
@@ -126,6 +127,7 @@ public class SqlReader extends AbstractReader {
         } catch (Exception ex) {
             LOGGER.error("error while reading data", ex);
             readerMetric.pluginReadFailCount.incrementAndGet();
+            readerMetric.pluginReadCount.incrementAndGet();
             throw new RuntimeException(ex);
         }
         return null;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
index 83d6e2a97..33e8816cc 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
@@ -93,6 +93,7 @@ public class FileReaderOperator extends AbstractReader {
             if (validateMessage(message)) {
                 AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, 
inlongGroupId, inlongStreamId,
                         System.currentTimeMillis(), 1, message.length());
+                readerMetric.pluginReadSuccessCount.incrementAndGet();
                 readerMetric.pluginReadCount.incrementAndGet();
                 String proxyPartitionKey = 
jobConf.get(PROXY_SEND_PARTITION_KEY, DigestUtils.md5Hex(inlongGroupId));
                 Map<String, String> header = new HashMap<>();
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerReader.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerReader.java
index 6b100fd84..7b07378b6 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerReader.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerReader.java
@@ -93,6 +93,8 @@ public class TestSQLServerReader {
 
     private AtomicLong atomicLong;
 
+    private AtomicLong atomicCountLong;
+
     private String sql;
 
     @Before
@@ -107,6 +109,7 @@ public class TestSQLServerReader {
         final String groupId = "group01";
         final String streamId = "stream01";
         atomicLong = new AtomicLong(0L);
+        atomicCountLong = new AtomicLong(0L);
 
         sql = "select * from dbo.test01";
 
@@ -143,6 +146,7 @@ public class TestSQLServerReader {
         
whenNew(AgentMetricItemSet.class).withArguments(anyString()).thenReturn(agentMetricItemSet);
         
when(agentMetricItemSet.findMetricItem(any())).thenReturn(agentMetricItem);
         field(AgentMetricItem.class, "pluginReadCount").set(agentMetricItem, 
atomicLong);
+        field(AgentMetricItem.class, 
"pluginReadSuccessCount").set(agentMetricItem, atomicCountLong);
 
         //init method
         (reader = new SQLServerReader(sql)).init(jobProfile);

Reply via email to