This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new bbe4cb7  [ISSUE #124]Some sink /source metric statistics are incorrect 
(#126)
bbe4cb7 is described below

commit bbe4cb780f8171a87ec4f27ba4f9183ab09fccea
Author: zhangjidi2016 <[email protected]>
AuthorDate: Wed May 18 13:49:17 2022 +0800

    [ISSUE #124]Some sink /source metric statistics are incorrect (#126)
    
    * [ISSUE #124]Some sink /source metric statistics are incorrect
    
    * add source poll times and sink read times statistical
    
    Co-authored-by: zhangjidi <[email protected]>
---
 .../runtime/connectorwrapper/WorkerSinkTask.java   |  5 ++++-
 .../runtime/connectorwrapper/WorkerSourceTask.java |  9 +++++---
 .../connect/runtime/stats/ConnectStatsManager.java | 26 +++++++++++++++++-----
 3 files changed, 30 insertions(+), 10 deletions(-)

diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
index dd6b8bc..8f95747 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
@@ -233,6 +233,9 @@ public class WorkerSinkTask implements WorkerTask {
                     log.error(" sink task {},pull message MQClientException, 
Error {} ", this, e.getMessage(), e);
                     connectStatsManager.incSinkRecordPutTotalFailNums();
                     
connectStatsManager.incSinkRecordPutFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
+                } finally {
+                    // record sink read times
+                    connectStatsManager.incSinkRecordReadTotalTimes();
                 }
             }
 
@@ -421,7 +424,7 @@ public class WorkerSinkTask implements WorkerTask {
             if (null != pullResult && 
pullResult.getPullStatus().equals(PullStatus.FOUND)) {
                 this.incPullTPS(entry.getKey().getTopic(), 
pullResult.getMsgFoundList().size());
                 messages = pullResult.getMsgFoundList();
-                connectStatsManager.incSinkRecordReadTotalNums();
+                
connectStatsManager.incSinkRecordReadTotalNums(messages.size());
                 
connectStatsManager.incSinkRecordReadNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID),
 messages.size());
                 long pullRT = System.currentTimeMillis() - 
beginPullMsgTimestamp;
                 connectStatsManager.incSinkRecordReadTotalRT(pullRT);
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index 955d770..4c7eee8 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -179,8 +179,8 @@ public class WorkerSourceTask implements WorkerTask {
                     try {
                         toSendRecord = poll();
                         if (null != toSendRecord && toSendRecord.size() > 0) {
-                            connectStatsManager.incSourceRecordPollTotalNums();
-                            
connectStatsManager.incSourceRecordPollNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
+                            
connectStatsManager.incSourceRecordPollTotalNums(toSendRecord.size());
+                            
connectStatsManager.incSourceRecordPollNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID),
 toSendRecord.size());
                             sendRecord();
                         }
                     } catch (RetriableException e) {
@@ -190,8 +190,11 @@ public class WorkerSourceTask implements WorkerTask {
                     } catch (Exception e) {
                         connectStatsManager.incSourceRecordPollTotalFailNums();
                         
connectStatsManager.incSourceRecordPollFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
-                        log.error("Source task RetriableException exception", 
e);
+                        log.error("Source task Exception exception", e);
                         state.set(WorkerTaskState.ERROR);
+                    } finally {
+                        // record source poll times
+                        connectStatsManager.incSourceRecordPollTotalTimes();
                     }
                 }
                 AtomicLong atomicLong = 
connectStatsService.singleSourceTaskTimesTotal(taskConfig.getString(RuntimeConfigDefine.TASK_ID));
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsManager.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsManager.java
index b7a03e9..72e10bf 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsManager.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsManager.java
@@ -68,6 +68,9 @@ public class ConnectStatsManager {
     public static final String SINK_RECORD_PUT_FAIL_RT = 
"SINK_RECORD_PUT_FAIL_RT";
     public static final String SINK_RECORD_PUT_TOTAL_FAIL_RT = 
"SINK_RECORD_PUT_TOTAL_FAIL_RT";
 
+    public static final String SOURCE_RECORD_POLL_TOTAL_TIMES = 
"SOURCE_RECORD_POLL_TOTAL_TIMES";
+    public static final String SINK_RECORD_READ_TOTAL_TIMES = 
"SINK_RECORD_READ_TOTAL_TIMES";
+
     /**
      * read disk follow stats
      */
@@ -119,6 +122,9 @@ public class ConnectStatsManager {
         this.statsTable.put(SINK_RECORD_PUT_TOTAL_RT, new 
StatsItemSet(SINK_RECORD_PUT_TOTAL_RT, this.scheduledExecutorService, log));
         this.statsTable.put(SINK_RECORD_PUT_FAIL_RT, new 
StatsItemSet(SINK_RECORD_PUT_FAIL_RT, this.scheduledExecutorService, log));
         this.statsTable.put(SINK_RECORD_PUT_TOTAL_FAIL_RT, new 
StatsItemSet(SINK_RECORD_PUT_TOTAL_FAIL_RT, this.scheduledExecutorService, 
log));
+
+        this.statsTable.put(SOURCE_RECORD_POLL_TOTAL_TIMES, new 
StatsItemSet(SOURCE_RECORD_POLL_TOTAL_TIMES, this.scheduledExecutorService, 
log));
+        this.statsTable.put(SINK_RECORD_READ_TOTAL_TIMES, new 
StatsItemSet(SINK_RECORD_READ_TOTAL_TIMES, this.scheduledExecutorService, log));
     }
 
     public void start() {
@@ -138,16 +144,16 @@ public class ConnectStatsManager {
         return null;
     }
 
-    public void incSourceRecordPollTotalNums() {
-        this.statsTable.get(SOURCE_RECORD_POLL_TOTAL_NUMS).addValue(worker, 1, 
1);
+    public void incSourceRecordPollTotalNums(int incValue) {
+        this.statsTable.get(SOURCE_RECORD_POLL_TOTAL_NUMS).addValue(worker, 
incValue, 1);
 
     }
 
-    public void incSourceRecordPollNums(String taskId) {
+    public void incSourceRecordPollNums(String taskId, int incValue) {
         if (StringUtils.isBlank(taskId)) {
             return;
         }
-        this.statsTable.get(SOURCE_RECORD_POLL_NUMS).addValue(taskId, 1, 1);
+        this.statsTable.get(SOURCE_RECORD_POLL_NUMS).addValue(taskId, 
incValue, 1);
     }
 
     public void incSourceRecordPollTotalFailNums() {
@@ -206,8 +212,8 @@ public class ConnectStatsManager {
         this.statsTable.get(SINK_RECORD_READ_FAIL_NUMS).addValue(taskId, 1, 1);
     }
 
-    public void incSinkRecordReadTotalNums() {
-        this.statsTable.get(SINK_RECORD_READ_TOTAL_NUMS).addValue(worker, 1, 
1);
+    public void incSinkRecordReadTotalNums(int incValue) {
+        this.statsTable.get(SINK_RECORD_READ_TOTAL_NUMS).addValue(worker, 
incValue, 1);
     }
 
     public void incSinkRecordReadNums(String taskId) {
@@ -281,4 +287,12 @@ public class ConnectStatsManager {
         }
         this.statsTable.get(SINK_RECORD_PUT_RT).addValue(taskId, (int) rt, 1);
     }
+
+    public void incSourceRecordPollTotalTimes() {
+        this.statsTable.get(SOURCE_RECORD_POLL_TOTAL_TIMES).addValue(worker, 
1, 1);
+    }
+
+    public void incSinkRecordReadTotalTimes() {
+        this.statsTable.get(SINK_RECORD_READ_TOTAL_TIMES).addValue(worker, 1, 
1);
+    }
 }

Reply via email to