This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 924df34b9 [INLONG-6471][Sort] MySQL connector metric restore lost init data for using sourceFunction (#6474) 924df34b9 is described below commit 924df34b9362c276a5183c7ef18c69da125e38ef Author: Xin Gong <genzhedangd...@gmail.com> AuthorDate: Wed Nov 9 10:35:53 2022 +0800 [INLONG-6471][Sort] MySQL connector metric restore lost init data for using sourceFunction (#6474) --- .../org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java index 80d6812f2..5ade85306 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java @@ -80,6 +80,8 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN; import static org.apache.inlong.sort.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory; import static org.apache.inlong.sort.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory; @@ -438,6 +440,8 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> MetricOption metricOption = MetricOption.builder() .withInlongLabels(inlongMetric) .withInlongAudit(inlongAudit) + .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_IN) : 0L) + .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_IN) : 0L) .withRegisterMetric(RegisteredMetric.ALL) .build(); if (metricOption != null) {