This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new ab460eb support user-defined statsitem & upgrade rocketmq.version to
4.7.1 (#246)
ab460eb is described below
commit ab460eb068bfafd903616aaa8c1374f4a8b97580
Author: lizhiboo <[email protected]>
AuthorDate: Mon Aug 15 16:51:10 2022 +0800
support user-defined statsitem & upgrade rocketmq.version to 4.7.1 (#246)
---
connectors/rocketmq-connect-debezium/pom.xml | 2 +-
connectors/rocketmq-connect-jdbc/pom.xml | 2 +-
connectors/rocketmq-replicator/pom.xml | 2 +-
pom.xml | 2 +-
rocketmq-connect-cli/pom.xml | 2 +-
rocketmq-connect-runtime/pom.xml | 2 +-
.../connect/runtime/stats/ConnectStatsManager.java | 25 ++++++++++++++++++++++
7 files changed, 31 insertions(+), 6 deletions(-)
diff --git a/connectors/rocketmq-connect-debezium/pom.xml
b/connectors/rocketmq-connect-debezium/pom.xml
index 5f0e2ec..4c7d6c5 100644
--- a/connectors/rocketmq-connect-debezium/pom.xml
+++ b/connectors/rocketmq-connect-debezium/pom.xml
@@ -180,7 +180,7 @@
<debezium.version>1.7.2.Final</debezium.version>
<debezium.postgresql.version>42.3.3</debezium.postgresql.version>
<!--rocketmq version-->
- <rocketmq.version>4.7.0</rocketmq.version>
+ <rocketmq.version>4.7.1</rocketmq.version>
<rocketmq-openmessaging.version>4.3.2</rocketmq-openmessaging.version>
<!--rocketmq connect version-->
diff --git a/connectors/rocketmq-connect-jdbc/pom.xml
b/connectors/rocketmq-connect-jdbc/pom.xml
index 097bb3a..a4face9 100644
--- a/connectors/rocketmq-connect-jdbc/pom.xml
+++ b/connectors/rocketmq-connect-jdbc/pom.xml
@@ -40,7 +40,7 @@
<!-- Compiler settings properties -->
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
- <rocketmq.version>4.7.0</rocketmq.version>
+ <rocketmq.version>4.7.1</rocketmq.version>
<!--test jar-->
<junit.version>4.13.1</junit.version>
diff --git a/connectors/rocketmq-replicator/pom.xml
b/connectors/rocketmq-replicator/pom.xml
index 95a72f3..5146e46 100644
--- a/connectors/rocketmq-replicator/pom.xml
+++ b/connectors/rocketmq-replicator/pom.xml
@@ -67,7 +67,7 @@
</build>
<properties>
- <rocketmq.version>4.7.0</rocketmq.version>
+ <rocketmq.version>4.7.1</rocketmq.version>
<openmessaging.connector.version>0.1.3</openmessaging.connector.version>
<junit.version>4.13.1</junit.version>
<mockito.version>3.2.4</mockito.version>
diff --git a/pom.xml b/pom.xml
index c4b4e16..7869245 100644
--- a/pom.xml
+++ b/pom.xml
@@ -39,7 +39,7 @@
</license>
</licenses>
<properties>
- <rocketmq.version>4.7.0</rocketmq.version>
+ <rocketmq.version>4.7.1</rocketmq.version>
<junit.version>4.13.1</junit.version>
<assertj.version>3.22.0</assertj.version>
<mockito.version>3.2.4</mockito.version>
diff --git a/rocketmq-connect-cli/pom.xml b/rocketmq-connect-cli/pom.xml
index 8b7976c..a685adc 100644
--- a/rocketmq-connect-cli/pom.xml
+++ b/rocketmq-connect-cli/pom.xml
@@ -36,7 +36,7 @@
<maven.compiler.target>1.8</maven.compiler.target>
<!-- RocketMQ Version-->
- <rocketmq.version>4.7.0</rocketmq.version>
+ <rocketmq.version>4.7.1</rocketmq.version>
</properties>
<build>
diff --git a/rocketmq-connect-runtime/pom.xml b/rocketmq-connect-runtime/pom.xml
index bd31d7c..b00929d 100644
--- a/rocketmq-connect-runtime/pom.xml
+++ b/rocketmq-connect-runtime/pom.xml
@@ -43,7 +43,7 @@
<maven.compiler.target>1.8</maven.compiler.target>
<!-- RocketMQ Version-->
- <rocketmq.version>4.7.0</rocketmq.version>
+ <rocketmq.version>4.7.1</rocketmq.version>
</properties>
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsManager.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsManager.java
index 72e10bf..c02b0d8 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsManager.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsManager.java
@@ -18,6 +18,7 @@
package org.apache.rocketmq.connect.runtime.stats;
import java.util.HashMap;
+import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -295,4 +296,28 @@ public class ConnectStatsManager {
public void incSinkRecordReadTotalTimes() {
this.statsTable.get(SINK_RECORD_READ_TOTAL_TIMES).addValue(worker, 1,
1);
}
+
+ public void initAdditionalItems(List<String> additionalItems) {
+ for (String additionalItem : additionalItems) {
+ if (this.statsTable.containsKey(additionalItem)) {
+ log.warn("Already exists statsItem : " + additionalItem + ",
just skip");
+ continue;
+ }
+ this.statsTable.put(additionalItem, new
StatsItemSet(additionalItem, scheduledExecutorService, log));
+ }
+ }
+
+ public void incAdditionalItem(String additionalItem, String key, int
incValue, int incTimes) {
+ StatsItemSet statsItemSet = this.statsTable.get(additionalItem);
+ if (statsItemSet != null) {
+ statsItemSet.addValue(key, incValue, incTimes);
+ }
+ }
+
+ public void removeAdditionalItem(String additionalItem, String key) {
+ StatsItemSet statsItemSet = this.statsTable.get(additionalItem);
+ if (statsItemSet != null) {
+ statsItemSet.delValue(key);
+ }
+ }
}