This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 67d29b9f57 [core] Partition expire should also clean done partitions 
(#7507)
67d29b9f57 is described below

commit 67d29b9f572c8fdace5e654509794989a72ba704
Author: tsreaper <[email protected]>
AuthorDate: Tue Mar 24 09:46:39 2026 +0800

    [core] Partition expire should also clean done partitions (#7507)
    
    Currently, partition expire does not clean `.done` partitions, so more
    and more `.done` partitions remain in the metastore. This PR fixes such
    issue.
---
 .../apache/paimon/operation/PartitionExpire.java   | 23 ++++++++++
 .../paimon/operation/PartitionExpireTest.java      | 51 +++++++++++++++++++++-
 .../apache/paimon/hive/HiveCatalogITCaseBase.java  | 25 +++++++++++
 3 files changed, 97 insertions(+), 2 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
index d32161dc0c..74b7850ed7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
@@ -37,6 +37,7 @@ import java.time.Duration;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
@@ -180,14 +181,36 @@ public class PartitionExpire {
         if (partitionModification != null) {
             try {
                 partitionModification.dropPartitions(expiredBatchPartitions);
+                // also drop corresponding .done partitions
+                
partitionModification.dropPartitions(toDonePartitions(expiredBatchPartitions));
             } catch (Catalog.TableNotExistException e) {
                 throw new RuntimeException(e);
             }
         } else {
+            // .done partitions only exist when partitionModification != null
+            // (metastore.partitioned-table = true), so no need to handle them 
here
             commit.dropPartitions(expiredBatchPartitions, commitIdentifier);
         }
     }
 
+    private List<Map<String, String>> toDonePartitions(
+            List<Map<String, String>> expiredPartitions) {
+        List<Map<String, String>> donePartitions = new 
ArrayList<>(expiredPartitions.size());
+        for (Map<String, String> partition : expiredPartitions) {
+            LinkedHashMap<String, String> donePartition = new 
LinkedHashMap<>(partition);
+            // append .done suffix to the last partition field value
+            Map.Entry<String, String> lastEntry = null;
+            for (Map.Entry<String, String> entry : donePartition.entrySet()) {
+                lastEntry = entry;
+            }
+            if (lastEntry != null) {
+                donePartition.put(lastEntry.getKey(), lastEntry.getValue() + 
".done");
+                donePartitions.add(donePartition);
+            }
+        }
+        return donePartitions;
+    }
+
     private List<Map<String, String>> convertToPartitionString(
             List<List<String>> expiredPartValues) {
         return expiredPartValues.stream()
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
index 4eef95b55b..fd406dfad3 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
@@ -30,6 +30,7 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataIncrement;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.PartitionStatistics;
+import org.apache.paimon.partition.actions.AddDonePartitionAction;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
@@ -64,9 +65,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 
@@ -89,6 +92,7 @@ public class PartitionExpireTest {
 
     private Path path;
     private FileStoreTable table;
+    private Set<Map<String, String>> createdPartitions;
     private List<Map<String, String>> deletedPartitions;
 
     @BeforeEach
@@ -103,17 +107,25 @@ public class PartitionExpireTest {
         Path tablePath = CoreOptions.path(options);
         String branchName = CoreOptions.branch(options.toMap());
         TableSchema tableSchema = new SchemaManager(fileIO, tablePath, 
branchName).latest().get();
+        createdPartitions = new HashSet<>();
         deletedPartitions = new ArrayList<>();
         PartitionModification partitionModification =
                 new PartitionModification() {
                     @Override
                     public void createPartitions(List<Map<String, String>> 
partitions)
-                            throws Catalog.TableNotExistException {}
+                            throws Catalog.TableNotExistException {
+                        createdPartitions.addAll(partitions);
+                    }
 
                     @Override
                     public void dropPartitions(List<Map<String, String>> 
partitions)
                             throws Catalog.TableNotExistException {
-                        deletedPartitions.addAll(partitions);
+                        for (Map<String, String> partition : partitions) {
+                            // only record partitions that were created
+                            if (createdPartitions.contains(partition)) {
+                                deletedPartitions.add(partition);
+                            }
+                        }
                         try (FileStoreCommit commit =
                                 table.store()
                                         .newCommit(
@@ -285,6 +297,41 @@ public class PartitionExpireTest {
                         new LinkedHashMap<>(Collections.singletonMap("f0", 
"20230105")));
     }
 
+    @Test
+    public void testDonePartitionExpire() throws Exception {
+        SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), 
path);
+        schemaManager.createTable(
+                new Schema(
+                        RowType.of(VarCharType.STRING_TYPE, 
VarCharType.STRING_TYPE).getFields(),
+                        singletonList("f0"),
+                        emptyList(),
+                        
Collections.singletonMap(METASTORE_PARTITIONED_TABLE.key(), "true"),
+                        ""));
+        newTable();
+
+        write("20230101", "11");
+        write("20230103", "31");
+        write("20230108", "81");
+
+        AddDonePartitionAction doneAction =
+                new 
AddDonePartitionAction(table.catalogEnvironment().partitionModification());
+        doneAction.markDone("f0=20230101");
+        doneAction.markDone("f0=20230103");
+        doneAction.markDone("f0=20230108");
+
+        PartitionExpire expire = newExpire();
+        expire.setLastCheck(date(1));
+        expire.expire(date(8), Long.MAX_VALUE);
+
+        assertThat(deletedPartitions)
+                .containsExactlyInAnyOrder(
+                        new LinkedHashMap<>(Collections.singletonMap("f0", 
"20230101")),
+                        new LinkedHashMap<>(Collections.singletonMap("f0", 
"20230103")),
+                        new LinkedHashMap<>(Collections.singletonMap("f0", 
"20230101.done")),
+                        new LinkedHashMap<>(Collections.singletonMap("f0", 
"20230103.done")));
+        assertThat(read()).containsExactlyInAnyOrder("20230108:81");
+    }
+
     @Test
     public void testFilterCommittedAfterExpiring() throws Exception {
         SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), 
path);
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index 5b5ee26bfd..4b9ed87980 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -1469,6 +1469,31 @@ public abstract class HiveCatalogITCaseBase {
         insertSql.getJobClient().get().cancel();
     }
 
+    @Test
+    public void testDonePartitionExpire() throws Exception {
+        tEnv.executeSql(
+                        "CREATE TABLE done_expire_t (a INT, dt STRING) 
PARTITIONED BY (dt) WITH ("
+                                + "'partition.timestamp-formatter'='yyyyMMdd',"
+                                + "'partition.timestamp-pattern'='$dt',"
+                                + "'metastore.partitioned-table'='true',"
+                                + 
"'partition.mark-done-action'='done-partition'"
+                                + ")")
+                .await();
+        tEnv.executeSql("INSERT INTO done_expire_t VALUES (1, 
'20240101')").await();
+        tEnv.executeSql("CALL sys.mark_partition_done('test_db.done_expire_t', 
'dt=20240101')")
+                .await();
+        assertThat(hiveShell.executeQuery("SHOW PARTITIONS done_expire_t"))
+                .containsExactlyInAnyOrder("dt=20240101", "dt=20240101.done");
+
+        tEnv.executeSql(
+                        "CALL sys.expire_partitions("
+                                + "`table` => 'test_db.done_expire_t', "
+                                + "expiration_time => '1 d', "
+                                + "timestamp_formatter => 'yyyyMMdd')")
+                .await();
+        assertThat(hiveShell.executeQuery("SHOW PARTITIONS 
done_expire_t")).isEmpty();
+    }
+
     @Test
     public void testRepairDatabasesOrTables() throws Exception {
         TableEnvironment fileCatalog = useFileCatalog("test_db");

Reply via email to