This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 360933b885d branch-2.1: [feat](binlog) Speed binlog gc by locked
binlogs #47547 (#48128)
360933b885d is described below
commit 360933b885d69d33723d56f44b73e846919a7a96
Author: walter <[email protected]>
AuthorDate: Thu Feb 20 17:08:26 2025 +0800
branch-2.1: [feat](binlog) Speed binlog gc by locked binlogs #47547 (#48128)
cherry pick from #47547
---
.../main/java/org/apache/doris/common/Config.java | 2 +-
.../org/apache/doris/binlog/BinlogManager.java | 5 ++-
.../java/org/apache/doris/binlog/DBBinlog.java | 45 +++++++++++++++++++---
.../java/org/apache/doris/binlog/TableBinlog.java | 20 +++++++++-
4 files changed, 63 insertions(+), 9 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index bbfa2f30e3a..5deaf8b51d5 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2417,7 +2417,7 @@ public class Config extends ConfigBase {
public static int max_binlog_messsage_size = 1024 * 1024 * 1024;
@ConfField(mutable = true, masterOnly = true, description = {
- "是否禁止使用 WITH REOSOURCE 语句创建 Catalog。",
+ "是否禁止使用 WITH RESOURCE 语句创建 Catalog。",
"Whether to disable creating catalog with WITH RESOURCE
statement."})
public static boolean disallow_create_catalog_with_resource = true;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index 2403ede6fb3..d4bf72a1e9c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -112,6 +112,10 @@ public class BinlogManager {
return;
}
+ LOG.debug("add binlog, db {}, table {}, commitSeq {}, timestamp {},
type {}, data {}",
+ binlog.getDbId(), binlog.getTableIds(), binlog.getCommitSeq(),
binlog.getTimestamp(), binlog.getType(),
+ binlog.getData());
+
DBBinlog dbBinlog;
lock.writeLock().lock();
try {
@@ -589,7 +593,6 @@ public class BinlogManager {
return tombstones;
}
-
public void replayGc(BinlogGcInfo binlogGcInfo) {
lock.writeLock().lock();
Map<Long, DBBinlog> gcDbBinlogMap;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
index 675c9dc78a8..bd7fd184426 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
@@ -43,6 +43,7 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
@@ -288,7 +289,6 @@ public class DBBinlog {
public Pair<TStatus, Long> lockBinlog(long tableId, String jobUniqueId,
long lockCommitSeq) {
TableBinlog tableBinlog = null;
-
lock.writeLock().lock();
try {
if (tableId < 0) {
@@ -457,20 +457,43 @@ public class DBBinlog {
}
if (lastExpiredBinlog != null) {
- dummy.setCommitSeq(lastExpiredBinlog.getCommitSeq());
+ final long expiredCommitSeq = lastExpiredBinlog.getCommitSeq();
+ dummy.setCommitSeq(expiredCommitSeq);
// release expired timestamps by commit seq.
Iterator<Pair<Long, Long>> timeIter = timestamps.iterator();
- while (timeIter.hasNext() && timeIter.next().first <=
lastExpiredBinlog.getCommitSeq()) {
+ while (timeIter.hasNext() && timeIter.next().first <=
expiredCommitSeq) {
timeIter.remove();
}
- gcDroppedResources(lastExpiredBinlog.getCommitSeq());
+ lockedBinlogs.entrySet().removeIf(ent -> ent.getValue() <=
expiredCommitSeq);
+ gcDroppedResources(expiredCommitSeq);
}
return lastExpiredBinlog;
}
+ private Optional<Long> getMinLockedCommitSeq() {
+ lock.readLock().lock();
+ try {
+ Optional<Long> minLockedCommitSeq =
lockedBinlogs.values().stream().min(Long::compareTo);
+ for (TableBinlog tableBinlog : tableBinlogMap.values()) {
+ Optional<Long> tableMinLockedCommitSeq =
tableBinlog.getMinLockedCommitSeq();
+ if (!tableMinLockedCommitSeq.isPresent()) {
+ continue;
+ }
+ if (minLockedCommitSeq.isPresent()) {
+ minLockedCommitSeq =
Optional.of(Math.min(minLockedCommitSeq.get(), tableMinLockedCommitSeq.get()));
+ } else {
+ minLockedCommitSeq = tableMinLockedCommitSeq;
+ }
+ }
+ return minLockedCommitSeq;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
private BinlogTombstone dbBinlogEnableGc(BinlogConfig dbBinlogConfig) {
long ttlSeconds = dbBinlogConfig.getTtlSeconds();
long maxBytes = dbBinlogConfig.getMaxBytes();
@@ -481,10 +504,12 @@ public class DBBinlog {
dbId, expiredMs, ttlSeconds, maxBytes, maxHistoryNums);
// step 1: get current tableBinlog info and expiredCommitSeq
+ Optional<Long> minLockedCommitSeq = getMinLockedCommitSeq();
TBinlog lastExpiredBinlog = null;
+ List<TableBinlog> tableBinlogs = Lists.newArrayList();
lock.writeLock().lock();
try {
- long expiredCommitSeq = -1;
+ long expiredCommitSeq = -1L;
Iterator<Pair<Long, Long>> timeIter = timestamps.iterator();
while (timeIter.hasNext()) {
Pair<Long, Long> pair = timeIter.next();
@@ -494,6 +519,13 @@ public class DBBinlog {
expiredCommitSeq = pair.first;
}
+ // Speed up gc by recycling binlogs that are not locked by syncer.
+ // To keep compatible with the old version, if no binlog is locked
here, fallthrough to the
+ // previous behavior (keep the entire binlogs until it is expired).
+ if (minLockedCommitSeq.isPresent() && expiredCommitSeq + 1L <
minLockedCommitSeq.get()) {
+ expiredCommitSeq = minLockedCommitSeq.get() - 1L;
+ }
+
final long lastExpiredCommitSeq = expiredCommitSeq;
BinlogComparator checker = (binlog) -> {
// NOTE: TreeSet read size during iterator remove is valid.
@@ -507,6 +539,7 @@ public class DBBinlog {
|| maxHistoryNums < allBinlogs.size();
};
lastExpiredBinlog = getLastExpiredBinlog(checker);
+ tableBinlogs.addAll(tableBinlogMap.values());
} finally {
lock.writeLock().unlock();
}
@@ -518,7 +551,7 @@ public class DBBinlog {
// step 2: gc every tableBinlog in dbBinlog, get table tombstone to
complete db
// tombstone
List<BinlogTombstone> tableTombstones = Lists.newArrayList();
- for (TableBinlog tableBinlog : tableBinlogMap.values()) {
+ for (TableBinlog tableBinlog : tableBinlogs) {
// step 2.1: gc tableBinlog,and get table tombstone
BinlogTombstone tableTombstone =
tableBinlog.commitSeqGc(lastExpiredBinlog.getCommitSeq());
if (tableTombstone != null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
index b6cf328eccc..cef60c85ac4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
@@ -39,6 +39,7 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -171,6 +172,15 @@ public class TableBinlog {
return Pair.of(new TStatus(TStatusCode.OK), lockCommitSeq);
}
+ public Optional<Long> getMinLockedCommitSeq() {
+ lock.readLock().lock();
+ try {
+ return lockedBinlogs.values().stream().min(Long::compareTo);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
private Pair<TBinlog, Long>
getLastUpsertAndLargestCommitSeq(BinlogComparator checker) {
if (binlogs.size() <= 1) {
return null;
@@ -199,7 +209,7 @@ public class TableBinlog {
return null;
}
- long expiredCommitSeq = lastExpiredBinlog.getCommitSeq();
+ final long expiredCommitSeq = lastExpiredBinlog.getCommitSeq();
dummyBinlog.setCommitSeq(expiredCommitSeq);
Iterator<Pair<Long, Long>> timeIterator = timestamps.iterator();
@@ -207,6 +217,7 @@ public class TableBinlog {
timeIterator.remove();
}
+ lockedBinlogs.entrySet().removeIf(ent -> ent.getValue() <=
expiredCommitSeq);
return Pair.of(tombstoneUpsert, expiredCommitSeq);
}
@@ -279,6 +290,13 @@ public class TableBinlog {
}
expiredCommitSeq = entry.first;
}
+
+ // find the min locked binlog commit seq, if not exists, use
the last binlog commit seq.
+ Optional<Long> minLockedCommitSeq =
lockedBinlogs.values().stream().min(Long::compareTo);
+ if (minLockedCommitSeq.isPresent() && expiredCommitSeq + 1L <
minLockedCommitSeq.get()) {
+ // Speed up the gc progress by the min locked commit seq.
+ expiredCommitSeq = minLockedCommitSeq.get() - 1L;
+ }
}
final long lastExpiredCommitSeq = expiredCommitSeq;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]