This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5db9bc587e [core] Introduce 'consumer.changelog-only' to keep less 
snapshots (#7499)
5db9bc587e is described below

commit 5db9bc587e353a05e3911d364d4e1785132f15e5
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Mar 23 15:37:48 2026 +0800

    [core] Introduce 'consumer.changelog-only' to keep less snapshots (#7499)
    
    Introduce `consumer.changelog-only`: If true, consumer will only affect
    changelog expiration and will not prevent snapshot from being expired.
---
 .../shortcodes/generated/core_configuration.html   |  6 +++
 .../main/java/org/apache/paimon/CoreOptions.java   | 13 +++++
 .../org/apache/paimon/options/ExpireConfig.java    | 18 ++++++-
 .../apache/paimon/table/ExpireSnapshotsImpl.java   |  7 ++-
 .../paimon/operation/ExpireSnapshotsTest.java      | 55 ++++++++++++++++++++++
 5 files changed, 95 insertions(+), 4 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index e24b4a2795..44aeee4980 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -380,6 +380,12 @@ under the License.
             <td>String</td>
             <td>Consumer id for recording the offset of consumption in the 
storage.</td>
         </tr>
+        <tr>
+            <td><h5>consumer.changelog-only</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>If true, consumer will only affect changelog expiration and 
will not prevent snapshot from being expired.</td>
+        </tr>
         <tr>
             <td><h5>consumer.expiration-time</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 78ce3d9367..569615f8b0 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1383,6 +1383,14 @@ public class CoreOptions implements Serializable {
                     .withDescription(
                             "Whether to ignore consumer progress for the newly 
started job.");
 
+    public static final ConfigOption<Boolean> CONSUMER_CHANGELOG_ONLY =
+            key("consumer.changelog-only")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "If true, consumer will only affect changelog 
expiration "
+                                    + "and will not prevent snapshot from 
being expired.");
+
     public static final ConfigOption<Long> DYNAMIC_BUCKET_TARGET_ROW_NUM =
             key("dynamic-bucket.target-row-num")
                     .longType()
@@ -2738,6 +2746,7 @@ public class CoreOptions implements Serializable {
                 
.changelogRetainMin(options.getOptional(CHANGELOG_NUM_RETAINED_MIN).orElse(null))
                 
.changelogTimeRetain(options.getOptional(CHANGELOG_TIME_RETAINED).orElse(null))
                 .changelogMaxDeletes(snapshotExpireLimit())
+                .consumerChangelogOnly(consumerChangelogOnly())
                 .build();
     }
 
@@ -3368,6 +3377,10 @@ public class CoreOptions implements Serializable {
         return options.get(CONSUMER_IGNORE_PROGRESS);
     }
 
+    public boolean consumerChangelogOnly() {
+        return options.get(CONSUMER_CHANGELOG_ONLY) && 
changelogLifecycleDecoupled();
+    }
+
     public boolean partitionedTableInMetastore() {
         return options.get(METASTORE_PARTITIONED_TABLE);
     }
diff --git 
a/paimon-api/src/main/java/org/apache/paimon/options/ExpireConfig.java 
b/paimon-api/src/main/java/org/apache/paimon/options/ExpireConfig.java
index 0de3828db0..bce215ed5d 100644
--- a/paimon-api/src/main/java/org/apache/paimon/options/ExpireConfig.java
+++ b/paimon-api/src/main/java/org/apache/paimon/options/ExpireConfig.java
@@ -31,6 +31,7 @@ public class ExpireConfig {
     private final Duration changelogTimeRetain;
     private final int changelogMaxDeletes;
     private final boolean changelogDecoupled;
+    private final boolean consumerChangelogOnly;
 
     public ExpireConfig(
             int snapshotRetainMax,
@@ -40,7 +41,8 @@ public class ExpireConfig {
             int changelogRetainMax,
             int changelogRetainMin,
             Duration changelogTimeRetain,
-            int changelogMaxDeletes) {
+            int changelogMaxDeletes,
+            boolean consumerChangelogOnly) {
         this.snapshotRetainMax = snapshotRetainMax;
         this.snapshotRetainMin = snapshotRetainMin;
         this.snapshotTimeRetain = snapshotTimeRetain;
@@ -53,6 +55,7 @@ public class ExpireConfig {
                 changelogRetainMax > snapshotRetainMax
                         || changelogRetainMin > snapshotRetainMin
                         || changelogTimeRetain.compareTo(snapshotTimeRetain) > 
0;
+        this.consumerChangelogOnly = consumerChangelogOnly;
     }
 
     public int getSnapshotRetainMax() {
@@ -91,6 +94,10 @@ public class ExpireConfig {
         return changelogDecoupled;
     }
 
+    public boolean isConsumerChangelogOnly() {
+        return consumerChangelogOnly;
+    }
+
     public static Builder builder() {
         return new Builder();
     }
@@ -106,6 +113,7 @@ public class ExpireConfig {
         private Integer changelogRetainMin = null;
         private Duration changelogTimeRetain = null;
         private Integer changelogMaxDeletes = null;
+        private boolean consumerChangelogOnly = false;
 
         public static Builder builder() {
             return new Builder();
@@ -151,6 +159,11 @@ public class ExpireConfig {
             return this;
         }
 
+        public Builder consumerChangelogOnly(boolean consumerChangelogOnly) {
+            this.consumerChangelogOnly = consumerChangelogOnly;
+            return this;
+        }
+
         public ExpireConfig build() {
             return new ExpireConfig(
                     snapshotRetainMax,
@@ -160,7 +173,8 @@ public class ExpireConfig {
                     changelogRetainMax == null ? snapshotRetainMax : 
changelogRetainMax,
                     changelogRetainMin == null ? snapshotRetainMin : 
changelogRetainMin,
                     changelogTimeRetain == null ? snapshotTimeRetain : 
changelogTimeRetain,
-                    changelogMaxDeletes == null ? snapshotMaxDeletes : 
changelogMaxDeletes);
+                    changelogMaxDeletes == null ? snapshotMaxDeletes : 
changelogMaxDeletes,
+                    consumerChangelogOnly);
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
index fcb79eb219..beeb6ae2b5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
@@ -124,8 +124,11 @@ public class ExpireSnapshotsImpl implements 
ExpireSnapshots {
         long maxExclusive = latestSnapshotId - retainMin + 1;
 
         // the snapshot being read by the consumer cannot be deleted
-        maxExclusive =
-                Math.min(maxExclusive, 
consumerManager.minNextSnapshot().orElse(Long.MAX_VALUE));
+        if (!expireConfig.isConsumerChangelogOnly()) {
+            maxExclusive =
+                    Math.min(
+                            maxExclusive, 
consumerManager.minNextSnapshot().orElse(Long.MAX_VALUE));
+        }
 
         // protected by 'snapshot.expire.limit'
         // (the maximum number of snapshots allowed to expire at a time)
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
index d63af5fd2c..01e0473e88 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
@@ -24,6 +24,8 @@ import org.apache.paimon.KeyValue;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.TestFileStore;
 import org.apache.paimon.TestKeyValueGenerator;
+import org.apache.paimon.consumer.Consumer;
+import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.fs.FileIO;
@@ -661,6 +663,59 @@ public class ExpireSnapshotsTest {
         assertSnapshot(latestSnapshotId, allData, snapshotPositions);
     }
 
+    @Test
+    public void testConsumerChangelogOnly() throws Exception {
+        List<KeyValue> allData = new ArrayList<>();
+        List<Integer> snapshotPositions = new ArrayList<>();
+        commit(10, allData, snapshotPositions);
+
+        // create a consumer at snapshot 3
+        ConsumerManager consumerManager = new ConsumerManager(fileIO, new 
Path(tempDir.toUri()));
+        consumerManager.resetConsumer("myConsumer", new Consumer(3));
+
+        // without consumerChangelogOnly, consumer should prevent snapshot 
expiration
+        ExpireConfig configDefault =
+                ExpireConfig.builder()
+                        .snapshotRetainMin(1)
+                        .snapshotRetainMax(1)
+                        .snapshotTimeRetain(Duration.ofMillis(Long.MAX_VALUE))
+                        .build();
+        store.newExpire(configDefault).expire();
+
+        // earliest snapshot should be 3 (protected by consumer)
+        assertThat(snapshotManager.earliestSnapshotId()).isEqualTo(3L);
+
+        // with consumerChangelogOnly=true, consumer should NOT prevent 
snapshot expiration
+        // but changelog decoupled so changelogs are created
+        ExpireConfig configChangelogOnly =
+                ExpireConfig.builder()
+                        .snapshotRetainMin(1)
+                        .snapshotRetainMax(1)
+                        .snapshotTimeRetain(Duration.ofMillis(Long.MAX_VALUE))
+                        .changelogRetainMax(Integer.MAX_VALUE)
+                        .consumerChangelogOnly(true)
+                        .build();
+        store.newExpire(configChangelogOnly).expire();
+
+        int latestSnapshotId2 = 
requireNonNull(snapshotManager.latestSnapshotId()).intValue();
+        // earliest snapshot should be latestSnapshotId (consumer no longer 
protects snapshots)
+        assertThat(snapshotManager.earliestSnapshotId()).isEqualTo((long) 
latestSnapshotId2);
+        assertSnapshot(latestSnapshotId2, allData, snapshotPositions);
+
+        // changelog expiration should still be protected by consumer
+        ExpireSnapshots changelogExpire = 
store.newChangelogExpire(configChangelogOnly);
+        changelogExpire.expire();
+
+        // earliest changelog should be 3 (still protected by consumer)
+        Long earliestChangelogId = 
changelogManager.earliestLongLivedChangelogId();
+        assertThat(earliestChangelogId).isNotNull();
+        assertThat(earliestChangelogId).isEqualTo(3L);
+
+        // clean up consumer file so assertCleaned passes
+        consumerManager.deleteConsumer("myConsumer");
+        store.assertCleaned();
+    }
+
     private TestFileStore createStore() {
         ThreadLocalRandom random = ThreadLocalRandom.current();
 

Reply via email to