This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new ad6737796f6 branch-2.1: [fix](binlog) Fix table not gc binlog meta/records #46981 (#47257) ad6737796f6 is described below commit ad6737796f6c604de5f17eb5c6c7dde35c06bb76 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Fri Jan 24 17:32:58 2025 +0800 branch-2.1: [fix](binlog) Fix table not gc binlog meta/records #46981 (#47257) Cherry-picked from #46981 Co-authored-by: Uniqueyou <wangyix...@selectdb.com> --- .../org/apache/doris/binlog/BinlogManager.java | 2 + .../java/org/apache/doris/binlog/DBBinlog.java | 13 +-- .../java/org/apache/doris/binlog/TableBinlog.java | 26 +++-- .../java/org/apache/doris/binlog/DbBinlogTest.java | 118 +++++++++++++++++++++ .../apache/doris/binlog/MockBinlogConfigCache.java | 5 + .../org/apache/doris/binlog/TableBinlogTest.java | 117 ++++++++++++++++++++ 6 files changed, 263 insertions(+), 18 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index 1128aa12cb1..3ec914abe63 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -558,9 +558,11 @@ public class BinlogManager { tombstones.add(dbTombstones); } } + return tombstones; } + public void replayGc(BinlogGcInfo binlogGcInfo) { lock.writeLock().lock(); Map<Long, DBBinlog> gcDbBinlogMap; diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java index 0816564f150..9ffc20412fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java @@ -287,17 +287,11 @@ public class DBBinlog { if (dbBinlogConfig == null) { LOG.error("db not found. dbId: {}", dbId); return null; - } - - BinlogTombstone tombstone; - if (dbBinlogConfig.isEnable()) { - // db binlog is enabled, only one binlogTombstones - tombstone = dbBinlogEnableGc(dbBinlogConfig); + } else if (!dbBinlogConfig.isEnable()) { + return dbBinlogDisableGc(); } else { - tombstone = dbBinlogDisableGc(); + return dbBinlogEnableGc(dbBinlogConfig); } - - return tombstone; } private BinlogTombstone collectTableTombstone(List<BinlogTombstone> tableTombstones, boolean isDbGc) { @@ -341,6 +335,7 @@ public class DBBinlog { tombstones.add(tombstone); } } + BinlogTombstone tombstone = collectTableTombstone(tombstones, false); if (tombstone != null) { removeExpiredMetaData(tombstone.getCommitSeq()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java index f3279b328c9..162adc2603b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java @@ -197,8 +197,11 @@ public class TableBinlog { public BinlogTombstone gc() { // step 1: get expire time BinlogConfig tableBinlogConfig = binlogConfigCache.getTableBinlogConfig(dbId, tableId); + Boolean isCleanFullBinlog = false; if (tableBinlogConfig == null) { return null; + } else if (!tableBinlogConfig.isEnable()) { + isCleanFullBinlog = true; } long ttlSeconds = tableBinlogConfig.getTtlSeconds(); @@ -208,22 +211,27 @@ public class TableBinlog { LOG.info( "gc table binlog. dbId: {}, tableId: {}, expiredMs: {}, ttlSecond: {}, maxBytes: {}, " - + "maxHistoryNums: {}, now: {}", - dbId, tableId, expiredMs, ttlSeconds, maxBytes, maxHistoryNums, System.currentTimeMillis()); + + "maxHistoryNums: {}, now: {}, isCleanFullBinlog: {}", + dbId, tableId, expiredMs, ttlSeconds, maxBytes, maxHistoryNums, System.currentTimeMillis(), + isCleanFullBinlog); // step 2: get tombstoneUpsertBinlog and dummyBinlog Pair<TBinlog, Long> tombstoneInfo; lock.writeLock().lock(); try { - // find the last expired commit seq. long expiredCommitSeq = -1; - Iterator<Pair<Long, Long>> timeIterator = timestamps.iterator(); - while (timeIterator.hasNext()) { - Pair<Long, Long> entry = timeIterator.next(); - if (expiredMs < entry.second) { - break; + if (isCleanFullBinlog) { + expiredCommitSeq = binlogs.last().getCommitSeq(); + } else { + // find the last expired commit seq. + Iterator<Pair<Long, Long>> timeIterator = timestamps.iterator(); + while (timeIterator.hasNext()) { + Pair<Long, Long> entry = timeIterator.next(); + if (expiredMs < entry.second) { + break; + } + expiredCommitSeq = entry.first; } - expiredCommitSeq = entry.first; } final long lastExpiredCommitSeq = expiredCommitSeq; diff --git a/fe/fe-core/src/test/java/org/apache/doris/binlog/DbBinlogTest.java b/fe/fe-core/src/test/java/org/apache/doris/binlog/DbBinlogTest.java index 06230bfce56..939ae49f232 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/binlog/DbBinlogTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/binlog/DbBinlogTest.java @@ -304,4 +304,122 @@ public class DbBinlogTest { } } } + + @Test + public void testDbAndTableGcWithDisable() { + // init base data + long expiredTime = baseNum + expiredBinlogNum; + Map<String, Long> ttlMap = Maps.newHashMap(); + for (int i = 0; i < tableNum; ++i) { + String key = String.format("%d_%d", dbId, baseTableId + i); + ttlMap.put(key, expiredTime); + } + MockBinlogConfigCache binlogConfigCache = BinlogTestUtils.newMockBinlogConfigCache(ttlMap); + // disable db binlog + binlogConfigCache.addDbBinlogConfig(dbId, false, 0L); + // disable some table binlog + for (int i = 0; i <= gcTableNum; i++) { + binlogConfigCache.addTableBinlogConfig(dbId, baseTableId + i, false, expiredTime); + } + + // init & add binlogs + List<TBinlog> testBinlogs = Lists.newArrayList(); + Long[] tableLastCommitInfo = new Long[tableNum]; + long maxGcTableId = baseTableId + gcTableNum; + for (int i = 0; i < totalBinlogNum; ++i) { + long tableId = baseTableId + (i / tableNum); + long commitSeq = baseNum + i; + tableLastCommitInfo[i / tableNum] = commitSeq; + TBinlog binlog = BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, baseNum); + testBinlogs.add(binlog); + } + + // init DbBinlog + DBBinlog dbBinlog = null; + + // insert binlogs + for (int i = 0; i < totalBinlogNum; ++i) { + if (dbBinlog == null) { + dbBinlog = new DBBinlog(binlogConfigCache, testBinlogs.get(i)); + } + dbBinlog.addBinlog(testBinlogs.get(i), null); + } + + // trigger gc + BinlogTombstone tombstone = dbBinlog.gc(); + + // check binlog status - all binlogs should be cleared for disabled tables + for (TBinlog binlog : testBinlogs) { + long tableId = binlog.getTableIds().get(0); + if (tableId <= maxGcTableId) { + // For disabled tables, all binlogs should be cleared + Assert.assertEquals(0, binlog.getTableRef()); + } else { + // For enabled tables, only expired binlogs should be cleared + if (binlog.getTimestamp() <= expiredTime) { + Assert.assertEquals(0, binlog.getTableRef()); + } else { + Assert.assertEquals(1, binlog.getTableRef()); + } + } + } + + // check tombstone + Assert.assertFalse(tombstone.isDbBinlogTomstone()); + Assert.assertEquals(baseNum + totalBinlogNum - 1, tombstone.getCommitSeq()); + } + + @Test + public void testDbAndTableGcWithEnable() { + // init base data + long expiredTime = baseNum + expiredBinlogNum; + Map<String, Long> ttlMap = Maps.newHashMap(); + for (int i = 0; i < tableNum; ++i) { + String key = String.format("%d_%d", dbId, baseTableId + i); + ttlMap.put(key, expiredTime); + } + MockBinlogConfigCache binlogConfigCache = BinlogTestUtils.newMockBinlogConfigCache(ttlMap); + // enable db binlog + binlogConfigCache.addDbBinlogConfig(dbId, true, expiredTime); + // enable all table binlog + for (int i = 0; i < tableNum; i++) { + binlogConfigCache.addTableBinlogConfig(dbId, baseTableId + i, true, expiredTime); + } + + // init & add binlogs + List<TBinlog> testBinlogs = Lists.newArrayList(); + for (int i = 0; i < totalBinlogNum; ++i) { + long tableId = baseTableId + (i / tableNum); + long commitSeq = baseNum + i; + TBinlog binlog = BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, baseNum + i); + testBinlogs.add(binlog); + } + + // init DbBinlog + DBBinlog dbBinlog = null; + + // insert binlogs + for (int i = 0; i < totalBinlogNum; ++i) { + if (dbBinlog == null) { + dbBinlog = new DBBinlog(binlogConfigCache, testBinlogs.get(i)); + } + dbBinlog.addBinlog(testBinlogs.get(i), null); + } + + // trigger gc + BinlogTombstone tombstone = dbBinlog.gc(); + + // check binlog status - only expired binlogs should be cleared + for (TBinlog binlog : testBinlogs) { + if (binlog.getTimestamp() <= expiredTime) { + Assert.assertEquals(0, binlog.getTableRef()); + } else { + Assert.assertEquals(1, binlog.getTableRef()); + } + } + + // check tombstone + Assert.assertTrue(tombstone.isDbBinlogTomstone()); + Assert.assertEquals(expiredTime, tombstone.getCommitSeq()); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/binlog/MockBinlogConfigCache.java b/fe/fe-core/src/test/java/org/apache/doris/binlog/MockBinlogConfigCache.java index 4622171e930..d2720bf61d5 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/binlog/MockBinlogConfigCache.java +++ b/fe/fe-core/src/test/java/org/apache/doris/binlog/MockBinlogConfigCache.java @@ -34,6 +34,11 @@ final class MockBinlogConfigCache extends BinlogConfigCache { mockedConfigs.put(String.valueOf(dbId), config); } + public void addTableBinlogConfig(long dbId, long tableId, boolean enableBinlog, long expiredTime) { + BinlogConfig config = BinlogTestUtils.newTestBinlogConfig(enableBinlog, expiredTime); + mockedConfigs.put(String.format("%d_%d", dbId, tableId), config); + } + @Override public BinlogConfig getTableBinlogConfig(long dbId, long tableId) { return mockedConfigs.get(String.format("%d_%d", dbId, tableId)); diff --git a/fe/fe-core/src/test/java/org/apache/doris/binlog/TableBinlogTest.java b/fe/fe-core/src/test/java/org/apache/doris/binlog/TableBinlogTest.java index cd86c5935e1..e55a1f252f0 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/binlog/TableBinlogTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/binlog/TableBinlogTest.java @@ -20,6 +20,7 @@ package org.apache.doris.binlog; import org.apache.doris.thrift.TBinlog; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import mockit.Mock; import mockit.MockUp; import org.junit.Assert; @@ -27,6 +28,7 @@ import org.junit.Before; import org.junit.Test; import java.util.List; +import java.util.Map; public class TableBinlogTest { private long dbId = 10000; @@ -139,4 +141,119 @@ public class TableBinlogTest { TBinlog dummy = tableBinlog.getDummyBinlog(); Assert.assertEquals(expiredCommitSeq, dummy.getCommitSeq()); } + + @Test + public void testTableGcBinlogWithDisable() { + // mock BinlogUtils + new MockUp<BinlogUtils>() { + @Mock + public long getExpiredMs(long direct) { + return direct; + } + }; + Map<String, Long> ttlMap = Maps.newHashMap(); + + // init base data + long expiredTime = baseNum + expiredBinlogNum; + ttlMap.put(String.format("%d_%d", dbId, tableId), expiredTime); + + MockBinlogConfigCache binlogConfigCache = BinlogTestUtils.newMockBinlogConfigCache(ttlMap); + + // disable table binlog + binlogConfigCache.addTableBinlogConfig(dbId, tableId, false, expiredTime); + + // init & add binlogs + List<TBinlog> testBinlogs = Lists.newArrayList(); + for (int i = 0; i < totalBinlogNum; ++i) { + TBinlog binlog = BinlogTestUtils.newBinlog(dbId, tableId, baseNum + i, baseNum + i); + testBinlogs.add(binlog); + } + + // init TableBinlog + TableBinlog tableBinlog = null; + + // insert binlogs + for (int i = 0; i < totalBinlogNum; ++i) { + if (tableBinlog == null) { + tableBinlog = new TableBinlog(binlogConfigCache, testBinlogs.get(i), dbId, tableId); + } + tableBinlog.addBinlog(testBinlogs.get(i)); + } + + // trigger gc + BinlogTombstone tombstone = tableBinlog.gc(); + + // check binlog status - all binlogs should be cleared when table binlog is disabled + for (TBinlog binlog : testBinlogs) { + Assert.assertEquals(0, binlog.getTableRef()); + } + + // check tombstone + Assert.assertFalse(tombstone.isDbBinlogTomstone()); + Assert.assertEquals(baseNum + totalBinlogNum - 1, tombstone.getCommitSeq()); + + // check dummy - should have the last commitSeq + TBinlog dummy = tableBinlog.getDummyBinlog(); + Assert.assertEquals(baseNum + totalBinlogNum - 1, dummy.getCommitSeq()); + } + + @Test + public void testTableGcBinlogWithEnable() { + // mock BinlogUtils + new MockUp<BinlogUtils>() { + @Mock + public long getExpiredMs(long direct) { + return direct; + } + }; + Map<String, Long> ttlMap = Maps.newHashMap(); + + // init base data + long expiredTime = baseNum + expiredBinlogNum; + ttlMap.put(String.format("%d_%d", dbId, tableId), expiredTime); + + MockBinlogConfigCache binlogConfigCache = BinlogTestUtils.newMockBinlogConfigCache(ttlMap); + + // enable table binlog + binlogConfigCache.addTableBinlogConfig(dbId, tableId, true, expiredTime); + + // init & add binlogs + List<TBinlog> testBinlogs = Lists.newArrayList(); + for (int i = 0; i < totalBinlogNum; ++i) { + TBinlog binlog = BinlogTestUtils.newBinlog(dbId, tableId, baseNum + i, baseNum + i); + testBinlogs.add(binlog); + } + + // init TableBinlog + TableBinlog tableBinlog = null; + + // insert binlogs + for (int i = 0; i < totalBinlogNum; ++i) { + if (tableBinlog == null) { + tableBinlog = new TableBinlog(binlogConfigCache, testBinlogs.get(i), dbId, tableId); + } + tableBinlog.addBinlog(testBinlogs.get(i)); + } + + // trigger gc + BinlogTombstone tombstone = tableBinlog.gc(); + + // check binlog status - only expired binlogs should be cleared + for (TBinlog binlog : testBinlogs) { + if (binlog.getTimestamp() <= expiredTime) { + Assert.assertEquals(0, binlog.getTableRef()); + } else { + Assert.assertEquals(1, binlog.getTableRef()); + } + } + + // check tombstone + Assert.assertFalse(tombstone.isDbBinlogTomstone()); + Assert.assertEquals(expiredTime, tombstone.getCommitSeq()); + + // check dummy - should have the expiredTime as commitSeq + TBinlog dummy = tableBinlog.getDummyBinlog(); + Assert.assertEquals(expiredTime, dummy.getCommitSeq()); + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org