This is an automated email from the ASF dual-hosted git repository. gaojun2048 pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 68978dbb4e [Bugfix][cdc base] Fix negative values in CDCRecordEmitDelay metric (#6259) 68978dbb4e is described below commit 68978dbb4eb4299a23d5722adf0468798fdbc795 Author: hailin0 <wanghai...@apache.org> AuthorDate: Wed Jan 24 10:04:26 2024 +0800 [Bugfix][cdc base] Fix negative values in CDCRecordEmitDelay metric (#6259) --- .../cdc/base/source/reader/IncrementalSourceRecordEmitter.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java index eacb427acb..3fbbd744b9 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.java @@ -99,10 +99,12 @@ public class IncrementalSourceRecordEmitter<T> // report fetch delay Long fetchTimestamp = getFetchTimestamp(element); if (fetchTimestamp != null) { - recordFetchDelay.set(fetchTimestamp - messageTimestamp); + long fetchDelay = fetchTimestamp - messageTimestamp; + recordFetchDelay.set(fetchDelay > 0 ? fetchDelay : 0); } // report emit delay - recordEmitDelay.set(now - messageTimestamp); + long emitDelay = now - messageTimestamp; + recordEmitDelay.set(emitDelay > 0 ? emitDelay : 0); } }