This is an automated email from the ASF dual-hosted git repository. w41ter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 90d96f1112b [improve](binlog) Add and adjust result for get_lag (#48953) 90d96f1112b is described below commit 90d96f1112b32206a7e9c9482b2b7c7e40c029ba Author: Uniqueyou <wangyix...@selectdb.com> AuthorDate: Fri Mar 14 10:29:15 2025 +0800 [improve](binlog) Add and adjust result for get_lag (#48953) --- .../org/apache/doris/binlog/BinlogLagInfo.java | 17 +++++++++++++- .../java/org/apache/doris/binlog/BinlogUtils.java | 27 +++++++++++----------- .../apache/doris/service/FrontendServiceImpl.java | 2 ++ gensrc/thrift/FrontendService.thrift | 2 ++ 4 files changed, 34 insertions(+), 14 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogLagInfo.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogLagInfo.java index 83b4181fa2f..4328958d879 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogLagInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogLagInfo.java @@ -23,13 +23,18 @@ public class BinlogLagInfo { private long lastCommitSeq; private long firstCommitTs; private long lastCommitTs; + private long nextCommitSeq; + private long nextCommitTs; - public BinlogLagInfo(long lag, long firstCommitSeq, long lastCommitSeq, long firstCommitTs, long lastCommitTs) { + public BinlogLagInfo(long lag, long firstCommitSeq, long lastCommitSeq, long firstCommitTs, long lastCommitTs, + long nextCommitSeq, long nextCommitTs) { this.lag = lag; this.firstCommitSeq = firstCommitSeq; this.lastCommitSeq = lastCommitSeq; this.firstCommitTs = firstCommitTs; this.lastCommitTs = lastCommitTs; + this.nextCommitSeq = nextCommitSeq; + this.nextCommitTs = nextCommitTs; } public BinlogLagInfo() { @@ -38,6 +43,16 @@ public class BinlogLagInfo { lastCommitSeq = 0; firstCommitTs = 0; lastCommitTs = 0; + nextCommitSeq = 0; + nextCommitTs = 0; + } + + public long getNextCommitSeq() { + return nextCommitSeq; + } + + public long getNextCommitTs() { + return nextCommitTs; } public long getLag() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java index 66350cec0d3..0347b94c530 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java @@ -68,30 +68,31 @@ public class BinlogUtils { if (firstBinlog.getCommitSeq() > prevCommitSeq) { BinlogLagInfo lagInfo = new BinlogLagInfo(binlogs.size(), firstBinlog.getCommitSeq(), - lastBinlog.getCommitSeq(), firstBinlog.getTimestamp(), lastBinlog.getTimestamp()); + lastBinlog.getCommitSeq(), firstBinlog.getTimestamp(), lastBinlog.getTimestamp(), + firstBinlog.getCommitSeq(), firstBinlog.getTimestamp()); return Pair.of(status, lagInfo); } // find first binlog whose commitSeq > commitSeq TBinlog guard = new TBinlog(); guard.setCommitSeq(prevCommitSeq); - TBinlog binlog = binlogs.higher(guard); + TBinlog nextBinlog = binlogs.higher(guard); // all prevCommitSeq <= commitSeq + long lastCommitSeq = lastBinlog.getCommitSeq(); + long lastCommitTs = lastBinlog.getTimestamp(); + long firstCommitSeq = firstBinlog.getCommitSeq(); + long firstCommitTs = firstBinlog.getTimestamp(); long lag = 0; - long lastCommitSeq = 0; - long lastCommitTs = 0; - long firstCommitSeq = 0; - long firstCommitTs = 0; - if (binlog != null) { - lag = binlogs.tailSet(binlog).size(); - firstCommitSeq = binlog.getCommitSeq(); - firstCommitTs = binlog.getTimestamp(); - lastCommitSeq = lastBinlog.getCommitSeq(); - lastCommitTs = lastBinlog.getTimestamp(); + long nextCommitSeq = 0; + long nextCommitTs = 0; + if (nextBinlog != null) { + lag = binlogs.tailSet(nextBinlog).size(); + nextCommitSeq = nextBinlog.getCommitSeq(); + nextCommitTs = nextBinlog.getTimestamp(); } return Pair.of(status, new BinlogLagInfo(lag, firstCommitSeq, lastCommitSeq, - firstCommitTs, lastCommitTs)); + firstCommitTs, lastCommitTs, nextCommitSeq, nextCommitTs)); } public static TBinlog newDummyBinlog(long dbId, long tableId) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 694601af814..2928d84e549 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -3362,6 +3362,8 @@ public class FrontendServiceImpl implements FrontendService.Iface { result.setLastCommitSeq(lagInfo.getLastCommitSeq()); result.setFirstBinlogTimestamp(lagInfo.getFirstCommitTs()); result.setLastBinlogTimestamp(lagInfo.getLastCommitTs()); + result.setNextCommitSeq(lagInfo.getNextCommitSeq()); + result.setNextBinlogTimestamp(lagInfo.getNextCommitTs()); } return result; } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 641c033f676..7a13bd504cb 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1301,6 +1301,8 @@ struct TGetBinlogLagResult { 5: optional i64 last_commit_seq 6: optional i64 first_binlog_timestamp 7: optional i64 last_binlog_timestamp + 8: optional i64 next_commit_seq + 9: optional i64 next_binlog_timestamp } struct TUpdateFollowerStatsCacheRequest { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org