This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new acf741fa80a [feature](binlog) Support gc binlogs by history nums and
size (#35250)
acf741fa80a is described below
commit acf741fa80a3d03dc3e4bd39f3d9ed122ffdcebc
Author: walter <[email protected]>
AuthorDate: Thu May 23 14:39:57 2024 +0800
[feature](binlog) Support gc binlogs by history nums and size (#35250)
* [chore](binlog) Add logs about binlog gc (#34359)
* [feature](binlog) Support gc binlogs by history nums and size (#34888)
---
.../org/apache/doris/binlog/BinlogComparator.java | 2 +-
.../org/apache/doris/binlog/BinlogConfigCache.java | 5 +-
.../java/org/apache/doris/binlog/BinlogGcer.java | 2 +-
.../org/apache/doris/binlog/BinlogManager.java | 5 +-
.../java/org/apache/doris/binlog/BinlogUtils.java | 8 ++
.../java/org/apache/doris/binlog/DBBinlog.java | 142 +++++++++++++--------
.../java/org/apache/doris/binlog/TableBinlog.java | 103 ++++++++++-----
.../org/apache/doris/binlog/BinlogManagerTest.java | 11 +-
.../org/apache/doris/binlog/TableBinlogTest.java | 2 +-
9 files changed, 180 insertions(+), 100 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogComparator.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogComparator.java
index 9e35cc3bd61..edc01782f31 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogComparator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogComparator.java
@@ -20,5 +20,5 @@ package org.apache.doris.binlog;
import org.apache.doris.thrift.TBinlog;
public interface BinlogComparator {
- boolean isExpired(TBinlog binlog, long expired);
+ boolean isExpired(TBinlog binlog);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java
index 30641bae8c6..b07f5e5d87c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java
@@ -41,6 +41,8 @@ public class BinlogConfigCache {
lock = new ReentrantReadWriteLock();
}
+ // Get the binlog config of the specified db, return null if no such
database
+ // exists.
public BinlogConfig getDBBinlogConfig(long dbId) {
lock.readLock().lock();
BinlogConfig binlogConfig = dbTableBinlogEnableMap.get(dbId);
@@ -110,7 +112,8 @@ public class BinlogConfigCache {
OlapTable olapTable = (OlapTable) table;
tableBinlogConfig = olapTable.getBinlogConfig();
// get table binlog config, when table modify binlogConfig
- // it create a new binlog, not update inplace, so we don't need to
clone binlogConfig
+ // it create a new binlog, not update inplace, so we don't need to
clone
+ // binlogConfig
dbTableBinlogEnableMap.put(tableId, tableBinlogConfig);
return tableBinlogConfig;
} catch (Exception e) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java
index 70118076114..c3e14e4955b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java
@@ -58,7 +58,7 @@ public class BinlogGcer extends MasterDaemon {
try {
List<BinlogTombstone> tombstones =
Env.getCurrentEnv().getBinlogManager().gc();
if (tombstones != null && !tombstones.isEmpty()) {
- LOG.info("tomebstones size: {}", tombstones.size());
+ LOG.info("tombstones size: {}", tombstones.size());
} else {
LOG.info("no gc binlog");
return;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index 96d0f7f4e13..454f678e2e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -58,9 +58,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
public class BinlogManager {
private static final int BUFFER_SIZE = 16 * 1024;
private static final ImmutableList<String> TITLE_NAMES = new
ImmutableList.Builder<String>().add("Name")
-
.add("Type").add("Id").add("Dropped").add("BinlogLength").add("FirstBinlogCommittedTime")
+
.add("Type").add("Id").add("Dropped").add("BinlogLength").add("BinlogSize").add("FirstBinlogCommittedTime")
.add("ReadableFirstBinlogCommittedTime").add("LastBinlogCommittedTime")
- .add("ReadableLastBinlogCommittedTime").add("BinlogTtlSeconds")
+
.add("ReadableLastBinlogCommittedTime").add("BinlogTtlSeconds").add("BinlogMaxBytes")
+ .add("BinlogMaxHistoryNums")
.build();
private static final Logger LOG =
LogManager.getLogger(BinlogManager.class);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
index 0f6c2308248..6b79fab143b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
@@ -81,6 +81,7 @@ public class BinlogUtils {
return dummy;
}
+ // Compute the expired timestamp in milliseconds.
public static long getExpiredMs(long ttlSeconds) {
long currentSeconds = System.currentTimeMillis() / 1000;
if (currentSeconds < ttlSeconds) {
@@ -94,4 +95,11 @@ public class BinlogUtils {
public static String convertTimeToReadable(long time) {
return new java.text.SimpleDateFormat("yyyy-MM-dd
HH:mm:ss").format(new java.util.Date(time));
}
+
+ public static long getApproximateMemoryUsage(TBinlog binlog) {
+ /* object layout: header + body + padding */
+ final long objSize = 80; // 9 fields and 1 header
+ String data = binlog.getData();
+ return objSize + binlog.getTableIdsSize() * 8 + (data == null ? 0 :
data.length());
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
index a3133bfb5c7..79e1adf20c9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
@@ -45,6 +45,8 @@ public class DBBinlog {
private static final Logger LOG =
LogManager.getLogger(BinlogManager.class);
private long dbId;
+ // The size of all binlogs.
+ private long binlogSize;
// guard for allBinlogs && tableBinlogMap
private ReentrantReadWriteLock lock;
// all binlogs contain table binlogs && create table binlog etc ...
@@ -64,6 +66,7 @@ public class DBBinlog {
lock = new ReentrantReadWriteLock();
this.dbId = binlog.getDbId();
this.binlogConfigCache = binlogConfigCache;
+ this.binlogSize = 0;
// allBinlogs treeset order by commitSeq
allBinlogs =
Sets.newTreeSet(Comparator.comparingLong(TBinlog::getCommitSeq));
@@ -81,7 +84,7 @@ public class DBBinlog {
}
public static DBBinlog recoverDbBinlog(BinlogConfigCache
binlogConfigCache, TBinlog dbDummy,
- List<TBinlog> tableDummies, boolean
dbBinlogEnable) {
+ List<TBinlog> tableDummies, boolean dbBinlogEnable) {
DBBinlog dbBinlog = new DBBinlog(binlogConfigCache, dbDummy);
long dbId = dbDummy.getDbId();
for (TBinlog tableDummy : tableDummies) {
@@ -105,6 +108,7 @@ public class DBBinlog {
}
allBinlogs.add(binlog);
+ binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog);
if (tableIds == null) {
return;
@@ -119,12 +123,13 @@ public class DBBinlog {
}
}
- // TODO(Drogon): remove TableBinlog after DropTable, think table drop &&
recovery
+ // TODO(Drogon): remove TableBinlog after DropTable, think table drop &&
+ // recovery
private TableBinlog getTableBinlog(TBinlog binlog, long tableId, boolean
dbBinlogEnable) {
TableBinlog tableBinlog = tableBinlogMap.get(tableId);
if (tableBinlog == null) {
if (dbBinlogEnable || binlogConfigCache.isEnableTable(dbId,
tableId)) {
- tableBinlog = new TableBinlog(binlogConfigCache, binlog, dbId,
tableId);
+ tableBinlog = new TableBinlog(binlogConfigCache, binlog, dbId,
tableId);
tableBinlogMap.put(tableId, tableBinlog);
tableDummyBinlogs.add(tableBinlog.getDummyBinlog());
}
@@ -132,7 +137,8 @@ public class DBBinlog {
return tableBinlog;
}
- // guard by BinlogManager, if addBinlog called, more than one(db/tables)
enable binlog
+ // guard by BinlogManager, if addBinlog called, more than one(db/tables)
enable
+ // binlog
public void addBinlog(TBinlog binlog) {
boolean dbBinlogEnable = binlogConfigCache.isEnableDB(dbId);
List<Long> tableIds = binlog.getTableIds();
@@ -140,6 +146,7 @@ public class DBBinlog {
lock.writeLock().lock();
try {
allBinlogs.add(binlog);
+ binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog);
if (binlog.getTimestamp() > 0 && dbBinlogEnable) {
timestamps.add(Pair.of(binlog.getCommitSeq(),
binlog.getTimestamp()));
@@ -226,14 +233,10 @@ public class DBBinlog {
return null;
}
- boolean dbBinlogEnable = dbBinlogConfig.isEnable();
BinlogTombstone tombstone;
- if (dbBinlogEnable) {
+ if (dbBinlogConfig.isEnable()) {
// db binlog is enabled, only one binlogTombstones
- long ttlSeconds = dbBinlogConfig.getTtlSeconds();
- long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds);
-
- tombstone = dbBinlogEnableGc(expiredMs);
+ tombstone = dbBinlogEnableGc(dbBinlogConfig);
} else {
tombstone = dbBinlogDisableGc();
}
@@ -277,7 +280,7 @@ public class DBBinlog {
}
for (TableBinlog tableBinlog : tableBinlogs) {
- BinlogTombstone tombstone = tableBinlog.ttlGc();
+ BinlogTombstone tombstone = tableBinlog.gc();
if (tombstone != null) {
tombstones.add(tombstone);
}
@@ -297,6 +300,7 @@ public class DBBinlog {
TBinlog dummy = binlogIter.next();
boolean foundFirstUsingBinlog = false;
long lastCommitSeq = -1;
+ long removed = 0;
while (binlogIter.hasNext()) {
TBinlog binlog = binlogIter.next();
@@ -304,6 +308,8 @@ public class DBBinlog {
if (commitSeq <= largestExpiredCommitSeq) {
if (binlog.table_ref <= 0) {
binlogIter.remove();
+ binlogSize -=
BinlogUtils.getApproximateMemoryUsage(binlog);
+ ++removed;
if (!foundFirstUsingBinlog) {
lastCommitSeq = commitSeq;
}
@@ -318,52 +324,92 @@ public class DBBinlog {
if (lastCommitSeq != -1) {
dummy.setCommitSeq(lastCommitSeq);
}
+
+ LOG.info("remove {} expired binlogs, dbId: {}, left: {}", removed,
dbId, allBinlogs.size());
} finally {
lock.writeLock().unlock();
}
}
- private BinlogTombstone dbBinlogEnableGc(long expiredMs) {
+ private TBinlog getLastExpiredBinlog(BinlogComparator checker) {
+ TBinlog lastExpiredBinlog = null;
+
+ Iterator<TBinlog> binlogIter = allBinlogs.iterator();
+ TBinlog dummy = binlogIter.next();
+ while (binlogIter.hasNext()) {
+ TBinlog binlog = binlogIter.next();
+ if (checker.isExpired(binlog)) {
+ binlogIter.remove();
+ binlogSize -= BinlogUtils.getApproximateMemoryUsage(binlog);
+ lastExpiredBinlog = binlog;
+ } else {
+ break;
+ }
+ }
+
+ if (lastExpiredBinlog != null) {
+ dummy.setCommitSeq(lastExpiredBinlog.getCommitSeq());
+
+ // release expired timestamps by commit seq.
+ Iterator<Pair<Long, Long>> timeIter = timestamps.iterator();
+ while (timeIter.hasNext() && timeIter.next().first <=
lastExpiredBinlog.getCommitSeq()) {
+ timeIter.remove();
+ }
+ }
+
+ return lastExpiredBinlog;
+ }
+
+ private BinlogTombstone dbBinlogEnableGc(BinlogConfig dbBinlogConfig) {
+ long ttlSeconds = dbBinlogConfig.getTtlSeconds();
+ long maxBytes = dbBinlogConfig.getMaxBytes();
+ long maxHistoryNums = dbBinlogConfig.getMaxHistoryNums();
+ long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds);
+
+ LOG.info("gc db binlog. dbId: {}, expiredMs: {}, ttlSecond: {},
maxBytes: {}, maxHistoryNums: {}",
+ dbId, expiredMs, ttlSeconds, maxBytes, maxHistoryNums);
+
// step 1: get current tableBinlog info and expiredCommitSeq
- long expiredCommitSeq = -1;
+ TBinlog lastExpiredBinlog = null;
lock.writeLock().lock();
try {
+ long expiredCommitSeq = -1;
Iterator<Pair<Long, Long>> timeIter = timestamps.iterator();
while (timeIter.hasNext()) {
Pair<Long, Long> pair = timeIter.next();
- if (pair.second <= expiredMs) {
- expiredCommitSeq = pair.first;
- timeIter.remove();
- } else {
+ if (pair.second > expiredMs) {
break;
}
+ expiredCommitSeq = pair.first;
}
- Iterator<TBinlog> binlogIter = allBinlogs.iterator();
- TBinlog dummy = binlogIter.next();
- dummy.setCommitSeq(expiredCommitSeq);
-
- while (binlogIter.hasNext()) {
- TBinlog binlog = binlogIter.next();
- if (binlog.getCommitSeq() <= expiredCommitSeq) {
- binlogIter.remove();
- } else {
- break;
- }
- }
+ final long lastExpiredCommitSeq = expiredCommitSeq;
+ BinlogComparator checker = (binlog) -> {
+ // NOTE: TreeSet read size during iterator remove is valid.
+ //
+ // The expired conditions in order:
+ // 1. expired time
+ // 2. the max bytes
+ // 3. the max history num
+ return binlog.getCommitSeq() <= lastExpiredCommitSeq
+ || maxBytes < binlogSize
+ || maxHistoryNums < allBinlogs.size();
+ };
+ lastExpiredBinlog = getLastExpiredBinlog(checker);
} finally {
lock.writeLock().unlock();
}
- if (expiredCommitSeq == -1) {
+ if (lastExpiredBinlog == null) {
return null;
}
- // step 2: gc every tableBinlog in dbBinlog, get table tombstone to
complete db tombstone
+ // step 2: gc every tableBinlog in dbBinlog, get table tombstone to
complete db
+ // tombstone
List<BinlogTombstone> tableTombstones = Lists.newArrayList();
for (TableBinlog tableBinlog : tableBinlogMap.values()) {
// step 2.1: gc tableBinlogļ¼and get table tombstone
- BinlogTombstone tableTombstone =
tableBinlog.commitSeqGc(expiredCommitSeq);
+ BinlogTombstone tableTombstone =
tableBinlog.commitSeqGc(lastExpiredBinlog.getCommitSeq());
if (tableTombstone != null) {
tableTombstones.add(tableTombstone);
}
@@ -386,28 +432,8 @@ public class DBBinlog {
lock.writeLock().lock();
try {
- Iterator<Pair<Long, Long>> timeIter = timestamps.iterator();
- while (timeIter.hasNext()) {
- long commitSeq = timeIter.next().first;
- if (commitSeq <= largestExpiredCommitSeq) {
- timeIter.remove();
- } else {
- break;
- }
- }
-
- Iterator<TBinlog> binlogIter = allBinlogs.iterator();
- TBinlog dummy = binlogIter.next();
- dummy.setCommitSeq(largestExpiredCommitSeq);
-
- while (binlogIter.hasNext()) {
- TBinlog binlog = binlogIter.next();
- if (binlog.getCommitSeq() <= largestExpiredCommitSeq) {
- binlogIter.remove();
- } else {
- break;
- }
- }
+ BinlogComparator checker = (binlog) -> binlog.getCommitSeq() <=
largestExpiredCommitSeq;
+ getLastExpiredBinlog(checker);
} finally {
lock.writeLock().unlock();
}
@@ -478,6 +504,8 @@ public class DBBinlog {
info.add(dropped);
String binlogLength = String.valueOf(allBinlogs.size());
info.add(binlogLength);
+ String binlogSize = String.valueOf(this.binlogSize);
+ info.add(binlogSize);
String firstBinlogCommittedTime = null;
String readableFirstBinlogCommittedTime = null;
if (!timestamps.isEmpty()) {
@@ -497,10 +525,16 @@ public class DBBinlog {
info.add(lastBinlogCommittedTime);
info.add(readableLastBinlogCommittedTime);
String binlogTtlSeconds = null;
+ String binlogMaxBytes = null;
+ String binlogMaxHistoryNums = null;
if (binlogConfig != null) {
binlogTtlSeconds =
String.valueOf(binlogConfig.getTtlSeconds());
+ binlogMaxBytes =
String.valueOf(binlogConfig.getMaxBytes());
+ binlogMaxHistoryNums =
String.valueOf(binlogConfig.getMaxHistoryNums());
}
info.add(binlogTtlSeconds);
+ info.add(binlogMaxBytes);
+ info.add(binlogMaxHistoryNums);
result.addRow(info);
} else {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
index 3dd290a07f8..36ec4f733ea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
@@ -27,6 +27,7 @@ import org.apache.doris.thrift.TBinlog;
import org.apache.doris.thrift.TBinlogType;
import org.apache.doris.thrift.TStatus;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -43,15 +44,23 @@ public class TableBinlog {
private long dbId;
private long tableId;
+ private long binlogSize;
private ReentrantReadWriteLock lock;
private TreeSet<TBinlog> binlogs;
+
+ // Pair(commitSeq, timestamp), used for gc
+ // need UpsertRecord to add timestamps for gc
+ private List<Pair<Long, Long>> timestamps;
+
private BinlogConfigCache binlogConfigCache;
public TableBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog,
long dbId, long tableId) {
this.dbId = dbId;
this.tableId = tableId;
+ this.binlogSize = 0;
lock = new ReentrantReadWriteLock();
binlogs =
Sets.newTreeSet(Comparator.comparingLong(TBinlog::getCommitSeq));
+ timestamps = Lists.newArrayList();
TBinlog dummy;
if (binlog.getType() == TBinlogType.DUMMY) {
@@ -77,6 +86,10 @@ public class TableBinlog {
if (binlog.getCommitSeq() > dummy.getCommitSeq()) {
binlogs.add(binlog);
++binlog.table_ref;
+ binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog);
+ if (binlog.getTimestamp() > 0) {
+ timestamps.add(Pair.of(binlog.getCommitSeq(),
binlog.getTimestamp()));
+ }
}
}
@@ -85,6 +98,10 @@ public class TableBinlog {
try {
binlogs.add(binlog);
++binlog.table_ref;
+ binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog);
+ if (binlog.getTimestamp() > 0) {
+ timestamps.add(Pair.of(binlog.getCommitSeq(),
binlog.getTimestamp()));
+ }
} finally {
lock.writeLock().unlock();
}
@@ -108,7 +125,7 @@ public class TableBinlog {
}
}
- private Pair<TBinlog, Long> getLastUpsertAndLargestCommitSeq(long expired,
BinlogComparator checker) {
+ private Pair<TBinlog, Long>
getLastUpsertAndLargestCommitSeq(BinlogComparator checker) {
if (binlogs.size() <= 1) {
return null;
}
@@ -119,9 +136,10 @@ public class TableBinlog {
TBinlog lastExpiredBinlog = null;
while (iter.hasNext()) {
TBinlog binlog = iter.next();
- if (checker.isExpired(binlog, expired)) {
+ if (checker.isExpired(binlog)) {
lastExpiredBinlog = binlog;
--binlog.table_ref;
+ binlogSize -= BinlogUtils.getApproximateMemoryUsage(binlog);
if (binlog.getType() == TBinlogType.UPSERT) {
tombstoneUpsert = binlog;
}
@@ -135,9 +153,15 @@ public class TableBinlog {
return null;
}
- dummyBinlog.setCommitSeq(lastExpiredBinlog.getCommitSeq());
+ long expiredCommitSeq = lastExpiredBinlog.getCommitSeq();
+ dummyBinlog.setCommitSeq(expiredCommitSeq);
- return Pair.of(tombstoneUpsert, lastExpiredBinlog.getCommitSeq());
+ Iterator<Pair<Long, Long>> timeIterator = timestamps.iterator();
+ while (timeIterator.hasNext() && timeIterator.next().first <=
expiredCommitSeq) {
+ timeIterator.remove();
+ }
+
+ return Pair.of(tombstoneUpsert, expiredCommitSeq);
}
// this method call when db binlog enable
@@ -147,8 +171,8 @@ public class TableBinlog {
// step 1: get tombstoneUpsertBinlog and dummyBinlog
lock.writeLock().lock();
try {
- BinlogComparator check = (binlog, expire) -> binlog.getCommitSeq()
<= expire;
- tombstoneInfo = getLastUpsertAndLargestCommitSeq(expiredCommitSeq,
check);
+ BinlogComparator check = (binlog) -> binlog.getCommitSeq() <=
expiredCommitSeq;
+ tombstoneInfo = getLastUpsertAndLargestCommitSeq(check);
} finally {
lock.writeLock().unlock();
}
@@ -171,7 +195,7 @@ public class TableBinlog {
}
// this method call when db binlog disable
- public BinlogTombstone ttlGc() {
+ public BinlogTombstone gc() {
// step 1: get expire time
BinlogConfig tableBinlogConfig =
binlogConfigCache.getTableBinlogConfig(dbId, tableId);
if (tableBinlogConfig == null) {
@@ -179,19 +203,43 @@ public class TableBinlog {
}
long ttlSeconds = tableBinlogConfig.getTtlSeconds();
+ long maxBytes = tableBinlogConfig.getMaxBytes();
+ long maxHistoryNums = tableBinlogConfig.getMaxHistoryNums();
long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds);
- if (expiredMs < 0) {
- return null;
- }
- LOG.info("ttl gc. dbId: {}, tableId: {}, expiredMs: {}", dbId,
tableId, expiredMs);
+ LOG.info(
+ "gc table binlog. dbId: {}, tableId: {}, expiredMs: {},
ttlSecond: {}, maxBytes: {}, "
+ + "maxHistoryNums: {}, now: {}",
+ dbId, tableId, expiredMs, ttlSeconds, maxBytes,
maxHistoryNums, System.currentTimeMillis());
// step 2: get tombstoneUpsertBinlog and dummyBinlog
Pair<TBinlog, Long> tombstoneInfo;
lock.writeLock().lock();
try {
- BinlogComparator check = (binlog, expire) -> binlog.getTimestamp()
<= expire;
- tombstoneInfo = getLastUpsertAndLargestCommitSeq(expiredMs, check);
+ // find the last expired commit seq.
+ long expiredCommitSeq = -1;
+ Iterator<Pair<Long, Long>> timeIterator = timestamps.iterator();
+ while (timeIterator.hasNext()) {
+ Pair<Long, Long> entry = timeIterator.next();
+ if (expiredMs < entry.second) {
+ break;
+ }
+ expiredCommitSeq = entry.first;
+ }
+
+ final long lastExpiredCommitSeq = expiredCommitSeq;
+ BinlogComparator check = (binlog) -> {
+ // NOTE: TreeSet read size during iterator remove is valid.
+ //
+ // The expired conditions in order:
+ // 1. expired time
+ // 2. the max bytes
+ // 3. the max history num
+ return binlog.getCommitSeq() <= lastExpiredCommitSeq
+ || maxBytes < binlogSize
+ || maxHistoryNums < binlogs.size();
+ };
+ tombstoneInfo = getLastUpsertAndLargestCommitSeq(check);
} finally {
lock.writeLock().unlock();
}
@@ -216,25 +264,8 @@ public class TableBinlog {
public void replayGc(long largestExpiredCommitSeq) {
lock.writeLock().lock();
try {
- long lastSeq = -1;
- Iterator<TBinlog> iter = binlogs.iterator();
- TBinlog dummyBinlog = iter.next();
-
- while (iter.hasNext()) {
- TBinlog binlog = iter.next();
- long commitSeq = binlog.getCommitSeq();
- if (commitSeq <= largestExpiredCommitSeq) {
- lastSeq = commitSeq;
- --binlog.table_ref;
- iter.remove();
- } else {
- break;
- }
- }
-
- if (lastSeq != -1) {
- dummyBinlog.setCommitSeq(lastSeq);
- }
+ BinlogComparator checker = (binlog) -> binlog.getCommitSeq() <=
largestExpiredCommitSeq;
+ getLastUpsertAndLargestCommitSeq(checker);
} finally {
lock.writeLock().unlock();
}
@@ -278,6 +309,8 @@ public class TableBinlog {
info.add(dropped);
String binlogLength = String.valueOf(binlogs.size());
info.add(binlogLength);
+ String binlogSize = String.valueOf(this.binlogSize);
+ info.add(binlogSize);
String firstBinlogCommittedTime = null;
String readableFirstBinlogCommittedTime = null;
for (TBinlog binlog : binlogs) {
@@ -305,10 +338,16 @@ public class TableBinlog {
info.add(lastBinlogCommittedTime);
info.add(readableLastBinlogCommittedTime);
String binlogTtlSeconds = null;
+ String binlogMaxBytes = null;
+ String binlogMaxHistoryNums = null;
if (binlogConfig != null) {
binlogTtlSeconds =
String.valueOf(binlogConfig.getTtlSeconds());
+ binlogMaxBytes = String.valueOf(binlogConfig.getMaxBytes());
+ binlogMaxHistoryNums =
String.valueOf(binlogConfig.getMaxHistoryNums());
}
info.add(binlogTtlSeconds);
+ info.add(binlogMaxBytes);
+ info.add(binlogMaxHistoryNums);
result.addRow(info);
} finally {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/binlog/BinlogManagerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/binlog/BinlogManagerTest.java
index 03f8d325d77..9542972359a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/binlog/BinlogManagerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/binlog/BinlogManagerTest.java
@@ -277,14 +277,9 @@ public class BinlogManagerTest {
for (Map.Entry<Long, List<Long>> dbEntry : frameWork.entrySet()) {
long dbId = dbEntry.getKey();
for (long tableId : dbEntry.getValue()) {
- if ((tableId / tableBaseId) % 2 != 0) {
- addBinlog.invoke(originManager,
BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, timeNow));
- addBinlog.invoke(newManager,
BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, timeNow));
- ++commitSeq;
- } else {
- addBinlog.invoke(originManager,
BinlogTestUtils.newBinlog(dbId, tableId, 0, 0));
- addBinlog.invoke(newManager,
BinlogTestUtils.newBinlog(dbId, tableId, 0, 0));
- }
+ addBinlog.invoke(originManager,
BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, timeNow));
+ addBinlog.invoke(newManager, BinlogTestUtils.newBinlog(dbId,
tableId, commitSeq, timeNow));
+ ++commitSeq;
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/binlog/TableBinlogTest.java
b/fe/fe-core/src/test/java/org/apache/doris/binlog/TableBinlogTest.java
index b4ecd8a90c5..cd86c5935e1 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/binlog/TableBinlogTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/binlog/TableBinlogTest.java
@@ -75,7 +75,7 @@ public class TableBinlogTest {
}
// trigger ttlGc
- BinlogTombstone tombstone = tableBinlog.ttlGc();
+ BinlogTombstone tombstone = tableBinlog.gc();
// check binlog status
for (TBinlog binlog : testBinlogs) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]