This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new ad6737796f6 branch-2.1: [fix](binlog) Fix table not gc binlog 
meta/records #46981 (#47257)
ad6737796f6 is described below

commit ad6737796f6c604de5f17eb5c6c7dde35c06bb76
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Jan 24 17:32:58 2025 +0800

    branch-2.1: [fix](binlog) Fix table not gc binlog meta/records #46981 
(#47257)
    
    Cherry-picked from #46981
    
    Co-authored-by: Uniqueyou <wangyix...@selectdb.com>
---
 .../org/apache/doris/binlog/BinlogManager.java     |   2 +
 .../java/org/apache/doris/binlog/DBBinlog.java     |  13 +--
 .../java/org/apache/doris/binlog/TableBinlog.java  |  26 +++--
 .../java/org/apache/doris/binlog/DbBinlogTest.java | 118 +++++++++++++++++++++
 .../apache/doris/binlog/MockBinlogConfigCache.java |   5 +
 .../org/apache/doris/binlog/TableBinlogTest.java   | 117 ++++++++++++++++++++
 6 files changed, 263 insertions(+), 18 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index 1128aa12cb1..3ec914abe63 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -558,9 +558,11 @@ public class BinlogManager {
                 tombstones.add(dbTombstones);
             }
         }
+
         return tombstones;
     }
 
+
     public void replayGc(BinlogGcInfo binlogGcInfo) {
         lock.writeLock().lock();
         Map<Long, DBBinlog> gcDbBinlogMap;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
index 0816564f150..9ffc20412fe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
@@ -287,17 +287,11 @@ public class DBBinlog {
         if (dbBinlogConfig == null) {
             LOG.error("db not found. dbId: {}", dbId);
             return null;
-        }
-
-        BinlogTombstone tombstone;
-        if (dbBinlogConfig.isEnable()) {
-            // db binlog is enabled, only one binlogTombstones
-            tombstone = dbBinlogEnableGc(dbBinlogConfig);
+        } else if (!dbBinlogConfig.isEnable()) {
+            return dbBinlogDisableGc();
         } else {
-            tombstone = dbBinlogDisableGc();
+            return dbBinlogEnableGc(dbBinlogConfig);
         }
-
-        return tombstone;
     }
 
     private BinlogTombstone collectTableTombstone(List<BinlogTombstone> 
tableTombstones, boolean isDbGc) {
@@ -341,6 +335,7 @@ public class DBBinlog {
                 tombstones.add(tombstone);
             }
         }
+
         BinlogTombstone tombstone = collectTableTombstone(tombstones, false);
         if (tombstone != null) {
             removeExpiredMetaData(tombstone.getCommitSeq());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
index f3279b328c9..162adc2603b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
@@ -197,8 +197,11 @@ public class TableBinlog {
     public BinlogTombstone gc() {
         // step 1: get expire time
         BinlogConfig tableBinlogConfig = 
binlogConfigCache.getTableBinlogConfig(dbId, tableId);
+        Boolean isCleanFullBinlog = false;
         if (tableBinlogConfig == null) {
             return null;
+        } else if (!tableBinlogConfig.isEnable()) {
+            isCleanFullBinlog = true;
         }
 
         long ttlSeconds = tableBinlogConfig.getTtlSeconds();
@@ -208,22 +211,27 @@ public class TableBinlog {
 
         LOG.info(
                 "gc table binlog. dbId: {}, tableId: {}, expiredMs: {}, 
ttlSecond: {}, maxBytes: {}, "
-                        + "maxHistoryNums: {}, now: {}",
-                dbId, tableId, expiredMs, ttlSeconds, maxBytes, 
maxHistoryNums, System.currentTimeMillis());
+                        + "maxHistoryNums: {}, now: {}, isCleanFullBinlog: {}",
+                dbId, tableId, expiredMs, ttlSeconds, maxBytes, 
maxHistoryNums, System.currentTimeMillis(),
+                isCleanFullBinlog);
 
         // step 2: get tombstoneUpsertBinlog and dummyBinlog
         Pair<TBinlog, Long> tombstoneInfo;
         lock.writeLock().lock();
         try {
-            // find the last expired commit seq.
             long expiredCommitSeq = -1;
-            Iterator<Pair<Long, Long>> timeIterator = timestamps.iterator();
-            while (timeIterator.hasNext()) {
-                Pair<Long, Long> entry = timeIterator.next();
-                if (expiredMs < entry.second) {
-                    break;
+            if (isCleanFullBinlog) {
+                expiredCommitSeq = binlogs.last().getCommitSeq();
+            } else {
+                // find the last expired commit seq.
+                Iterator<Pair<Long, Long>> timeIterator = 
timestamps.iterator();
+                while (timeIterator.hasNext()) {
+                    Pair<Long, Long> entry = timeIterator.next();
+                    if (expiredMs < entry.second) {
+                        break;
+                    }
+                    expiredCommitSeq = entry.first;
                 }
-                expiredCommitSeq = entry.first;
             }
 
             final long lastExpiredCommitSeq = expiredCommitSeq;
diff --git a/fe/fe-core/src/test/java/org/apache/doris/binlog/DbBinlogTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/binlog/DbBinlogTest.java
index 06230bfce56..939ae49f232 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/binlog/DbBinlogTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/binlog/DbBinlogTest.java
@@ -304,4 +304,122 @@ public class DbBinlogTest {
             }
         }
     }
+
+    @Test
+    public void testDbAndTableGcWithDisable() {
+        // init base data
+        long expiredTime = baseNum + expiredBinlogNum;
+        Map<String, Long> ttlMap = Maps.newHashMap();
+        for (int i = 0; i < tableNum; ++i) {
+            String key = String.format("%d_%d", dbId, baseTableId + i);
+            ttlMap.put(key, expiredTime);
+        }
+        MockBinlogConfigCache binlogConfigCache = 
BinlogTestUtils.newMockBinlogConfigCache(ttlMap);
+        // disable db binlog
+        binlogConfigCache.addDbBinlogConfig(dbId, false, 0L);
+        // disable some table binlog
+        for (int i = 0; i <= gcTableNum; i++) {
+            binlogConfigCache.addTableBinlogConfig(dbId, baseTableId + i, 
false, expiredTime);
+        }
+
+        // init & add binlogs
+        List<TBinlog> testBinlogs = Lists.newArrayList();
+        Long[] tableLastCommitInfo = new Long[tableNum];
+        long maxGcTableId = baseTableId + gcTableNum;
+        for (int i = 0; i < totalBinlogNum; ++i) {
+            long tableId = baseTableId + (i / tableNum);
+            long commitSeq = baseNum + i;
+            tableLastCommitInfo[i / tableNum] = commitSeq;
+            TBinlog binlog = BinlogTestUtils.newBinlog(dbId, tableId, 
commitSeq, baseNum);
+            testBinlogs.add(binlog);
+        }
+
+        // init DbBinlog
+        DBBinlog dbBinlog = null;
+
+        // insert binlogs
+        for (int i = 0; i < totalBinlogNum; ++i) {
+            if (dbBinlog == null) {
+                dbBinlog = new DBBinlog(binlogConfigCache, testBinlogs.get(i));
+            }
+            dbBinlog.addBinlog(testBinlogs.get(i), null);
+        }
+
+        // trigger gc
+        BinlogTombstone tombstone = dbBinlog.gc();
+
+        // check binlog status - all binlogs should be cleared for disabled 
tables
+        for (TBinlog binlog : testBinlogs) {
+            long tableId = binlog.getTableIds().get(0);
+            if (tableId <= maxGcTableId) {
+                // For disabled tables, all binlogs should be cleared
+                Assert.assertEquals(0, binlog.getTableRef());
+            } else {
+                // For enabled tables, only expired binlogs should be cleared
+                if (binlog.getTimestamp() <= expiredTime) {
+                    Assert.assertEquals(0, binlog.getTableRef());
+                } else {
+                    Assert.assertEquals(1, binlog.getTableRef());
+                }
+            }
+        }
+
+        // check tombstone
+        Assert.assertFalse(tombstone.isDbBinlogTomstone());
+        Assert.assertEquals(baseNum + totalBinlogNum - 1, 
tombstone.getCommitSeq());
+    }
+
+    @Test
+    public void testDbAndTableGcWithEnable() {
+        // init base data
+        long expiredTime = baseNum + expiredBinlogNum;
+        Map<String, Long> ttlMap = Maps.newHashMap();
+        for (int i = 0; i < tableNum; ++i) {
+            String key = String.format("%d_%d", dbId, baseTableId + i);
+            ttlMap.put(key, expiredTime);
+        }
+        MockBinlogConfigCache binlogConfigCache = 
BinlogTestUtils.newMockBinlogConfigCache(ttlMap);
+        // enable db binlog
+        binlogConfigCache.addDbBinlogConfig(dbId, true, expiredTime);
+        // enable all table binlog
+        for (int i = 0; i < tableNum; i++) {
+            binlogConfigCache.addTableBinlogConfig(dbId, baseTableId + i, 
true, expiredTime);
+        }
+
+        // init & add binlogs
+        List<TBinlog> testBinlogs = Lists.newArrayList();
+        for (int i = 0; i < totalBinlogNum; ++i) {
+            long tableId = baseTableId + (i / tableNum);
+            long commitSeq = baseNum + i;
+            TBinlog binlog = BinlogTestUtils.newBinlog(dbId, tableId, 
commitSeq, baseNum + i);
+            testBinlogs.add(binlog);
+        }
+
+        // init DbBinlog
+        DBBinlog dbBinlog = null;
+
+        // insert binlogs
+        for (int i = 0; i < totalBinlogNum; ++i) {
+            if (dbBinlog == null) {
+                dbBinlog = new DBBinlog(binlogConfigCache, testBinlogs.get(i));
+            }
+            dbBinlog.addBinlog(testBinlogs.get(i), null);
+        }
+
+        // trigger gc
+        BinlogTombstone tombstone = dbBinlog.gc();
+
+        // check binlog status - only expired binlogs should be cleared
+        for (TBinlog binlog : testBinlogs) {
+            if (binlog.getTimestamp() <= expiredTime) {
+                Assert.assertEquals(0, binlog.getTableRef());
+            } else {
+                Assert.assertEquals(1, binlog.getTableRef());
+            }
+        }
+
+        // check tombstone
+        Assert.assertTrue(tombstone.isDbBinlogTomstone());
+        Assert.assertEquals(expiredTime, tombstone.getCommitSeq());
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/binlog/MockBinlogConfigCache.java 
b/fe/fe-core/src/test/java/org/apache/doris/binlog/MockBinlogConfigCache.java
index 4622171e930..d2720bf61d5 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/binlog/MockBinlogConfigCache.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/binlog/MockBinlogConfigCache.java
@@ -34,6 +34,11 @@ final class MockBinlogConfigCache extends BinlogConfigCache {
         mockedConfigs.put(String.valueOf(dbId), config);
     }
 
+    public void addTableBinlogConfig(long dbId, long tableId, boolean 
enableBinlog, long expiredTime) {
+        BinlogConfig config = 
BinlogTestUtils.newTestBinlogConfig(enableBinlog, expiredTime);
+        mockedConfigs.put(String.format("%d_%d", dbId, tableId), config);
+    }
+
     @Override
     public BinlogConfig getTableBinlogConfig(long dbId, long tableId) {
         return mockedConfigs.get(String.format("%d_%d", dbId, tableId));
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/binlog/TableBinlogTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/binlog/TableBinlogTest.java
index cd86c5935e1..e55a1f252f0 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/binlog/TableBinlogTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/binlog/TableBinlogTest.java
@@ -20,6 +20,7 @@ package org.apache.doris.binlog;
 import org.apache.doris.thrift.TBinlog;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import mockit.Mock;
 import mockit.MockUp;
 import org.junit.Assert;
@@ -27,6 +28,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.List;
+import java.util.Map;
 
 public class TableBinlogTest {
     private long dbId = 10000;
@@ -139,4 +141,119 @@ public class TableBinlogTest {
         TBinlog dummy = tableBinlog.getDummyBinlog();
         Assert.assertEquals(expiredCommitSeq, dummy.getCommitSeq());
     }
+
+    @Test
+    public void testTableGcBinlogWithDisable() {
+        // mock BinlogUtils
+        new MockUp<BinlogUtils>() {
+            @Mock
+            public long getExpiredMs(long direct) {
+                return direct;
+            }
+        };
+        Map<String, Long> ttlMap = Maps.newHashMap();
+
+        // init base data
+        long expiredTime = baseNum + expiredBinlogNum;
+        ttlMap.put(String.format("%d_%d", dbId, tableId), expiredTime);
+
+        MockBinlogConfigCache binlogConfigCache = 
BinlogTestUtils.newMockBinlogConfigCache(ttlMap);
+
+        // disable table binlog
+        binlogConfigCache.addTableBinlogConfig(dbId, tableId, false, 
expiredTime);
+
+        // init & add binlogs
+        List<TBinlog> testBinlogs = Lists.newArrayList();
+        for (int i = 0; i < totalBinlogNum; ++i) {
+            TBinlog binlog = BinlogTestUtils.newBinlog(dbId, tableId, baseNum 
+ i, baseNum + i);
+            testBinlogs.add(binlog);
+        }
+
+        // init TableBinlog
+        TableBinlog tableBinlog = null;
+
+        // insert binlogs
+        for (int i = 0; i < totalBinlogNum; ++i) {
+            if (tableBinlog == null) {
+                tableBinlog = new TableBinlog(binlogConfigCache, 
testBinlogs.get(i), dbId, tableId);
+            }
+            tableBinlog.addBinlog(testBinlogs.get(i));
+        }
+
+        // trigger gc
+        BinlogTombstone tombstone = tableBinlog.gc();
+
+        // check binlog status - all binlogs should be cleared when table 
binlog is disabled
+        for (TBinlog binlog : testBinlogs) {
+            Assert.assertEquals(0, binlog.getTableRef());
+        }
+
+        // check tombstone
+        Assert.assertFalse(tombstone.isDbBinlogTomstone());
+        Assert.assertEquals(baseNum + totalBinlogNum - 1, 
tombstone.getCommitSeq());
+
+        // check dummy - should have the last commitSeq
+        TBinlog dummy = tableBinlog.getDummyBinlog();
+        Assert.assertEquals(baseNum + totalBinlogNum - 1, 
dummy.getCommitSeq());
+    }
+
+    @Test
+    public void testTableGcBinlogWithEnable() {
+        // mock BinlogUtils
+        new MockUp<BinlogUtils>() {
+            @Mock
+            public long getExpiredMs(long direct) {
+                return direct;
+            }
+        };
+        Map<String, Long> ttlMap = Maps.newHashMap();
+
+        // init base data
+        long expiredTime = baseNum + expiredBinlogNum;
+        ttlMap.put(String.format("%d_%d", dbId, tableId), expiredTime);
+
+        MockBinlogConfigCache binlogConfigCache = 
BinlogTestUtils.newMockBinlogConfigCache(ttlMap);
+
+        // enable table binlog
+        binlogConfigCache.addTableBinlogConfig(dbId, tableId, true, 
expiredTime);
+
+        // init & add binlogs
+        List<TBinlog> testBinlogs = Lists.newArrayList();
+        for (int i = 0; i < totalBinlogNum; ++i) {
+            TBinlog binlog = BinlogTestUtils.newBinlog(dbId, tableId, baseNum 
+ i, baseNum + i);
+            testBinlogs.add(binlog);
+        }
+
+        // init TableBinlog
+        TableBinlog tableBinlog = null;
+
+        // insert binlogs
+        for (int i = 0; i < totalBinlogNum; ++i) {
+            if (tableBinlog == null) {
+                tableBinlog = new TableBinlog(binlogConfigCache, 
testBinlogs.get(i), dbId, tableId);
+            }
+            tableBinlog.addBinlog(testBinlogs.get(i));
+        }
+
+        // trigger gc
+        BinlogTombstone tombstone = tableBinlog.gc();
+
+        // check binlog status - only expired binlogs should be cleared
+        for (TBinlog binlog : testBinlogs) {
+            if (binlog.getTimestamp() <= expiredTime) {
+                Assert.assertEquals(0, binlog.getTableRef());
+            } else {
+                Assert.assertEquals(1, binlog.getTableRef());
+            }
+        }
+
+        // check tombstone
+        Assert.assertFalse(tombstone.isDbBinlogTomstone());
+        Assert.assertEquals(expiredTime, tombstone.getCommitSeq());
+
+        // check dummy - should have the expiredTime as commitSeq
+        TBinlog dummy = tableBinlog.getDummyBinlog();
+        Assert.assertEquals(expiredTime, dummy.getCommitSeq());
+    }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to