This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 67d29b9f57 [core] Partition expire should also clean done partitions
(#7507)
67d29b9f57 is described below
commit 67d29b9f572c8fdace5e654509794989a72ba704
Author: tsreaper <[email protected]>
AuthorDate: Tue Mar 24 09:46:39 2026 +0800
[core] Partition expire should also clean done partitions (#7507)
Currently, partition expire does not clean `.done` partitions, so more
and more `.done` partitions remain in the metastore. This PR fixes such
issue.
---
.../apache/paimon/operation/PartitionExpire.java | 23 ++++++++++
.../paimon/operation/PartitionExpireTest.java | 51 +++++++++++++++++++++-
.../apache/paimon/hive/HiveCatalogITCaseBase.java | 25 +++++++++++
3 files changed, 97 insertions(+), 2 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
index d32161dc0c..74b7850ed7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
@@ -37,6 +37,7 @@ import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
@@ -180,14 +181,36 @@ public class PartitionExpire {
if (partitionModification != null) {
try {
partitionModification.dropPartitions(expiredBatchPartitions);
+ // also drop corresponding .done partitions
+
partitionModification.dropPartitions(toDonePartitions(expiredBatchPartitions));
} catch (Catalog.TableNotExistException e) {
throw new RuntimeException(e);
}
} else {
+ // .done partitions only exist when partitionModification != null
+ // (metastore.partitioned-table = true), so no need to handle them
here
commit.dropPartitions(expiredBatchPartitions, commitIdentifier);
}
}
+ private List<Map<String, String>> toDonePartitions(
+ List<Map<String, String>> expiredPartitions) {
+ List<Map<String, String>> donePartitions = new
ArrayList<>(expiredPartitions.size());
+ for (Map<String, String> partition : expiredPartitions) {
+ LinkedHashMap<String, String> donePartition = new
LinkedHashMap<>(partition);
+ // append .done suffix to the last partition field value
+ Map.Entry<String, String> lastEntry = null;
+ for (Map.Entry<String, String> entry : donePartition.entrySet()) {
+ lastEntry = entry;
+ }
+ if (lastEntry != null) {
+ donePartition.put(lastEntry.getKey(), lastEntry.getValue() +
".done");
+ donePartitions.add(donePartition);
+ }
+ }
+ return donePartitions;
+ }
+
private List<Map<String, String>> convertToPartitionString(
List<List<String>> expiredPartValues) {
return expiredPartValues.stream()
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
index 4eef95b55b..fd406dfad3 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
@@ -30,6 +30,7 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionStatistics;
+import org.apache.paimon.partition.actions.AddDonePartitionAction;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
@@ -64,9 +65,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
@@ -89,6 +92,7 @@ public class PartitionExpireTest {
private Path path;
private FileStoreTable table;
+ private Set<Map<String, String>> createdPartitions;
private List<Map<String, String>> deletedPartitions;
@BeforeEach
@@ -103,17 +107,25 @@ public class PartitionExpireTest {
Path tablePath = CoreOptions.path(options);
String branchName = CoreOptions.branch(options.toMap());
TableSchema tableSchema = new SchemaManager(fileIO, tablePath,
branchName).latest().get();
+ createdPartitions = new HashSet<>();
deletedPartitions = new ArrayList<>();
PartitionModification partitionModification =
new PartitionModification() {
@Override
public void createPartitions(List<Map<String, String>>
partitions)
- throws Catalog.TableNotExistException {}
+ throws Catalog.TableNotExistException {
+ createdPartitions.addAll(partitions);
+ }
@Override
public void dropPartitions(List<Map<String, String>>
partitions)
throws Catalog.TableNotExistException {
- deletedPartitions.addAll(partitions);
+ for (Map<String, String> partition : partitions) {
+ // only record partitions that were created
+ if (createdPartitions.contains(partition)) {
+ deletedPartitions.add(partition);
+ }
+ }
try (FileStoreCommit commit =
table.store()
.newCommit(
@@ -285,6 +297,41 @@ public class PartitionExpireTest {
new LinkedHashMap<>(Collections.singletonMap("f0",
"20230105")));
}
+ @Test
+ public void testDonePartitionExpire() throws Exception {
+ SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(),
path);
+ schemaManager.createTable(
+ new Schema(
+ RowType.of(VarCharType.STRING_TYPE,
VarCharType.STRING_TYPE).getFields(),
+ singletonList("f0"),
+ emptyList(),
+
Collections.singletonMap(METASTORE_PARTITIONED_TABLE.key(), "true"),
+ ""));
+ newTable();
+
+ write("20230101", "11");
+ write("20230103", "31");
+ write("20230108", "81");
+
+ AddDonePartitionAction doneAction =
+ new
AddDonePartitionAction(table.catalogEnvironment().partitionModification());
+ doneAction.markDone("f0=20230101");
+ doneAction.markDone("f0=20230103");
+ doneAction.markDone("f0=20230108");
+
+ PartitionExpire expire = newExpire();
+ expire.setLastCheck(date(1));
+ expire.expire(date(8), Long.MAX_VALUE);
+
+ assertThat(deletedPartitions)
+ .containsExactlyInAnyOrder(
+ new LinkedHashMap<>(Collections.singletonMap("f0",
"20230101")),
+ new LinkedHashMap<>(Collections.singletonMap("f0",
"20230103")),
+ new LinkedHashMap<>(Collections.singletonMap("f0",
"20230101.done")),
+ new LinkedHashMap<>(Collections.singletonMap("f0",
"20230103.done")));
+ assertThat(read()).containsExactlyInAnyOrder("20230108:81");
+ }
+
@Test
public void testFilterCommittedAfterExpiring() throws Exception {
SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(),
path);
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index 5b5ee26bfd..4b9ed87980 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -1469,6 +1469,31 @@ public abstract class HiveCatalogITCaseBase {
insertSql.getJobClient().get().cancel();
}
+ @Test
+ public void testDonePartitionExpire() throws Exception {
+ tEnv.executeSql(
+ "CREATE TABLE done_expire_t (a INT, dt STRING)
PARTITIONED BY (dt) WITH ("
+ + "'partition.timestamp-formatter'='yyyyMMdd',"
+ + "'partition.timestamp-pattern'='$dt',"
+ + "'metastore.partitioned-table'='true',"
+ +
"'partition.mark-done-action'='done-partition'"
+ + ")")
+ .await();
+ tEnv.executeSql("INSERT INTO done_expire_t VALUES (1,
'20240101')").await();
+ tEnv.executeSql("CALL sys.mark_partition_done('test_db.done_expire_t',
'dt=20240101')")
+ .await();
+ assertThat(hiveShell.executeQuery("SHOW PARTITIONS done_expire_t"))
+ .containsExactlyInAnyOrder("dt=20240101", "dt=20240101.done");
+
+ tEnv.executeSql(
+ "CALL sys.expire_partitions("
+ + "`table` => 'test_db.done_expire_t', "
+ + "expiration_time => '1 d', "
+ + "timestamp_formatter => 'yyyyMMdd')")
+ .await();
+ assertThat(hiveShell.executeQuery("SHOW PARTITIONS
done_expire_t")).isEmpty();
+ }
+
@Test
public void testRepairDatabasesOrTables() throws Exception {
TableEnvironment fileCatalog = useFileCatalog("test_db");