This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5db9bc587e [core] Introduce 'consumer.changelog-only' to keep less
snapshots (#7499)
5db9bc587e is described below
commit 5db9bc587e353a05e3911d364d4e1785132f15e5
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Mar 23 15:37:48 2026 +0800
[core] Introduce 'consumer.changelog-only' to keep less snapshots (#7499)
Introduce `consumer.changelog-only`: If true, consumer will only affect
changelog expiration and will not prevent snapshot from being expired.
---
.../shortcodes/generated/core_configuration.html | 6 +++
.../main/java/org/apache/paimon/CoreOptions.java | 13 +++++
.../org/apache/paimon/options/ExpireConfig.java | 18 ++++++-
.../apache/paimon/table/ExpireSnapshotsImpl.java | 7 ++-
.../paimon/operation/ExpireSnapshotsTest.java | 55 ++++++++++++++++++++++
5 files changed, 95 insertions(+), 4 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index e24b4a2795..44aeee4980 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -380,6 +380,12 @@ under the License.
<td>String</td>
<td>Consumer id for recording the offset of consumption in the
storage.</td>
</tr>
+ <tr>
+ <td><h5>consumer.changelog-only</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>If true, consumer will only affect changelog expiration and
will not prevent snapshot from being expired.</td>
+ </tr>
<tr>
<td><h5>consumer.expiration-time</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 78ce3d9367..569615f8b0 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1383,6 +1383,14 @@ public class CoreOptions implements Serializable {
.withDescription(
"Whether to ignore consumer progress for the newly
started job.");
+ public static final ConfigOption<Boolean> CONSUMER_CHANGELOG_ONLY =
+ key("consumer.changelog-only")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "If true, consumer will only affect changelog
expiration "
+ + "and will not prevent snapshot from
being expired.");
+
public static final ConfigOption<Long> DYNAMIC_BUCKET_TARGET_ROW_NUM =
key("dynamic-bucket.target-row-num")
.longType()
@@ -2738,6 +2746,7 @@ public class CoreOptions implements Serializable {
.changelogRetainMin(options.getOptional(CHANGELOG_NUM_RETAINED_MIN).orElse(null))
.changelogTimeRetain(options.getOptional(CHANGELOG_TIME_RETAINED).orElse(null))
.changelogMaxDeletes(snapshotExpireLimit())
+ .consumerChangelogOnly(consumerChangelogOnly())
.build();
}
@@ -3368,6 +3377,10 @@ public class CoreOptions implements Serializable {
return options.get(CONSUMER_IGNORE_PROGRESS);
}
+ public boolean consumerChangelogOnly() {
+ return options.get(CONSUMER_CHANGELOG_ONLY) &&
changelogLifecycleDecoupled();
+ }
+
public boolean partitionedTableInMetastore() {
return options.get(METASTORE_PARTITIONED_TABLE);
}
diff --git
a/paimon-api/src/main/java/org/apache/paimon/options/ExpireConfig.java
b/paimon-api/src/main/java/org/apache/paimon/options/ExpireConfig.java
index 0de3828db0..bce215ed5d 100644
--- a/paimon-api/src/main/java/org/apache/paimon/options/ExpireConfig.java
+++ b/paimon-api/src/main/java/org/apache/paimon/options/ExpireConfig.java
@@ -31,6 +31,7 @@ public class ExpireConfig {
private final Duration changelogTimeRetain;
private final int changelogMaxDeletes;
private final boolean changelogDecoupled;
+ private final boolean consumerChangelogOnly;
public ExpireConfig(
int snapshotRetainMax,
@@ -40,7 +41,8 @@ public class ExpireConfig {
int changelogRetainMax,
int changelogRetainMin,
Duration changelogTimeRetain,
- int changelogMaxDeletes) {
+ int changelogMaxDeletes,
+ boolean consumerChangelogOnly) {
this.snapshotRetainMax = snapshotRetainMax;
this.snapshotRetainMin = snapshotRetainMin;
this.snapshotTimeRetain = snapshotTimeRetain;
@@ -53,6 +55,7 @@ public class ExpireConfig {
changelogRetainMax > snapshotRetainMax
|| changelogRetainMin > snapshotRetainMin
|| changelogTimeRetain.compareTo(snapshotTimeRetain) >
0;
+ this.consumerChangelogOnly = consumerChangelogOnly;
}
public int getSnapshotRetainMax() {
@@ -91,6 +94,10 @@ public class ExpireConfig {
return changelogDecoupled;
}
+ public boolean isConsumerChangelogOnly() {
+ return consumerChangelogOnly;
+ }
+
public static Builder builder() {
return new Builder();
}
@@ -106,6 +113,7 @@ public class ExpireConfig {
private Integer changelogRetainMin = null;
private Duration changelogTimeRetain = null;
private Integer changelogMaxDeletes = null;
+ private boolean consumerChangelogOnly = false;
public static Builder builder() {
return new Builder();
@@ -151,6 +159,11 @@ public class ExpireConfig {
return this;
}
+ public Builder consumerChangelogOnly(boolean consumerChangelogOnly) {
+ this.consumerChangelogOnly = consumerChangelogOnly;
+ return this;
+ }
+
public ExpireConfig build() {
return new ExpireConfig(
snapshotRetainMax,
@@ -160,7 +173,8 @@ public class ExpireConfig {
changelogRetainMax == null ? snapshotRetainMax :
changelogRetainMax,
changelogRetainMin == null ? snapshotRetainMin :
changelogRetainMin,
changelogTimeRetain == null ? snapshotTimeRetain :
changelogTimeRetain,
- changelogMaxDeletes == null ? snapshotMaxDeletes :
changelogMaxDeletes);
+ changelogMaxDeletes == null ? snapshotMaxDeletes :
changelogMaxDeletes,
+ consumerChangelogOnly);
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
index fcb79eb219..beeb6ae2b5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
@@ -124,8 +124,11 @@ public class ExpireSnapshotsImpl implements
ExpireSnapshots {
long maxExclusive = latestSnapshotId - retainMin + 1;
// the snapshot being read by the consumer cannot be deleted
- maxExclusive =
- Math.min(maxExclusive,
consumerManager.minNextSnapshot().orElse(Long.MAX_VALUE));
+ if (!expireConfig.isConsumerChangelogOnly()) {
+ maxExclusive =
+ Math.min(
+ maxExclusive,
consumerManager.minNextSnapshot().orElse(Long.MAX_VALUE));
+ }
// protected by 'snapshot.expire.limit'
// (the maximum number of snapshots allowed to expire at a time)
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
index d63af5fd2c..01e0473e88 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
@@ -24,6 +24,8 @@ import org.apache.paimon.KeyValue;
import org.apache.paimon.Snapshot;
import org.apache.paimon.TestFileStore;
import org.apache.paimon.TestKeyValueGenerator;
+import org.apache.paimon.consumer.Consumer;
+import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.fs.FileIO;
@@ -661,6 +663,59 @@ public class ExpireSnapshotsTest {
assertSnapshot(latestSnapshotId, allData, snapshotPositions);
}
+ @Test
+ public void testConsumerChangelogOnly() throws Exception {
+ List<KeyValue> allData = new ArrayList<>();
+ List<Integer> snapshotPositions = new ArrayList<>();
+ commit(10, allData, snapshotPositions);
+
+ // create a consumer at snapshot 3
+ ConsumerManager consumerManager = new ConsumerManager(fileIO, new
Path(tempDir.toUri()));
+ consumerManager.resetConsumer("myConsumer", new Consumer(3));
+
+ // without consumerChangelogOnly, consumer should prevent snapshot
expiration
+ ExpireConfig configDefault =
+ ExpireConfig.builder()
+ .snapshotRetainMin(1)
+ .snapshotRetainMax(1)
+ .snapshotTimeRetain(Duration.ofMillis(Long.MAX_VALUE))
+ .build();
+ store.newExpire(configDefault).expire();
+
+ // earliest snapshot should be 3 (protected by consumer)
+ assertThat(snapshotManager.earliestSnapshotId()).isEqualTo(3L);
+
+ // with consumerChangelogOnly=true, consumer should NOT prevent
snapshot expiration
+ // but changelog decoupled so changelogs are created
+ ExpireConfig configChangelogOnly =
+ ExpireConfig.builder()
+ .snapshotRetainMin(1)
+ .snapshotRetainMax(1)
+ .snapshotTimeRetain(Duration.ofMillis(Long.MAX_VALUE))
+ .changelogRetainMax(Integer.MAX_VALUE)
+ .consumerChangelogOnly(true)
+ .build();
+ store.newExpire(configChangelogOnly).expire();
+
+ int latestSnapshotId2 =
requireNonNull(snapshotManager.latestSnapshotId()).intValue();
+ // earliest snapshot should be latestSnapshotId (consumer no longer
protects snapshots)
+ assertThat(snapshotManager.earliestSnapshotId()).isEqualTo((long)
latestSnapshotId2);
+ assertSnapshot(latestSnapshotId2, allData, snapshotPositions);
+
+ // changelog expiration should still be protected by consumer
+ ExpireSnapshots changelogExpire =
store.newChangelogExpire(configChangelogOnly);
+ changelogExpire.expire();
+
+ // earliest changelog should be 3 (still protected by consumer)
+ Long earliestChangelogId =
changelogManager.earliestLongLivedChangelogId();
+ assertThat(earliestChangelogId).isNotNull();
+ assertThat(earliestChangelogId).isEqualTo(3L);
+
+ // clean up consumer file so assertCleaned passes
+ consumerManager.deleteConsumer("myConsumer");
+ store.assertCleaned();
+ }
+
private TestFileStore createStore() {
ThreadLocalRandom random = ThreadLocalRandom.current();