This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0223348b6c [core] Fix TagAutoCreation.forceCreatingSnapshot to use
SINK_PROCESS_TIME_ZONE (#7600)
0223348b6c is described below
commit 0223348b6c62ee6697ac56fe23afb5d3cfac80f9
Author: yuzelin <[email protected]>
AuthorDate: Tue Apr 7 11:43:43 2026 +0800
[core] Fix TagAutoCreation.forceCreatingSnapshot to use
SINK_PROCESS_TIME_ZONE (#7600)
TagAutoCreation.forceCreatingSnapshot in the ProcessTimeExtractor branch
uses LocalDateTime.now() (machine timezone) to determine whether to
force creating a snapshot. When sink.process-time-zone is configured
differently from the machine timezone (e.g. UTC on an Asia/Shanghai
machine), the tag creation time is incorrect — it triggers at the
machine's midnight instead of the configured timezone's midnight.
---
.../org/apache/paimon/tag/TagAutoCreation.java | 19 ++++++---
.../org/apache/paimon/tag/TagAutoManagerTest.java | 45 ++++++++++++++++++++++
2 files changed, 59 insertions(+), 5 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
index 08dfe25378..5f72763c70 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java
@@ -20,6 +20,7 @@ package org.apache.paimon.tag;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagTimeExtractor.ProcessTimeExtractor;
@@ -60,6 +61,7 @@ public class TagAutoCreation {
private final List<TagCallback> callbacks;
private final Duration idlenessTimeout;
private final boolean automaticCompletion;
+ private final ZoneId sinkProcessTimeZone;
private LocalDateTime nextTag;
private long nextSnapshot;
@@ -75,7 +77,8 @@ public class TagAutoCreation {
@Nullable Duration defaultTimeRetained,
Duration idlenessTimeout,
boolean automaticCompletion,
- List<TagCallback> callbacks) {
+ List<TagCallback> callbacks,
+ ZoneId sinkProcessTimeZone) {
this.snapshotManager = snapshotManager;
this.tagManager = tagManager;
this.tagDeletion = tagDeletion;
@@ -87,6 +90,7 @@ public class TagAutoCreation {
this.callbacks = callbacks;
this.idlenessTimeout = idlenessTimeout;
this.automaticCompletion = automaticCompletion;
+ this.sinkProcessTimeZone = sinkProcessTimeZone;
this.periodHandler.validateDelay(delay);
@@ -123,13 +127,17 @@ public class TagAutoCreation {
return isAfterOrEqual(LocalDateTime.now().minus(idlenessTimeout),
snapshotTime);
} else if (timeExtractor instanceof ProcessTimeExtractor) {
- return nextTag == null
- || isAfterOrEqual(
- LocalDateTime.now().minus(delay),
periodHandler.nextTagTime(nextTag));
+ return
forceCreatingSnapshotProcessTime(LocalDateTime.now(sinkProcessTimeZone));
}
return false;
}
+ @VisibleForTesting
+ boolean forceCreatingSnapshotProcessTime(LocalDateTime now) {
+ return nextTag == null
+ || isAfterOrEqual(now.minus(delay),
periodHandler.nextTagTime(nextTag));
+ }
+
public void run() {
while (true) {
if (snapshotManager.snapshotExists(nextSnapshot)) {
@@ -230,6 +238,7 @@ public class TagAutoCreation {
options.tagDefaultTimeRetained(),
options.snapshotWatermarkIdleTimeout(),
options.tagAutomaticCompletion(),
- callbacks);
+ callbacks,
+ options.sinkProcessTimeZone());
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
index c8954f7b31..ef0e1627ee 100644
--- a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java
@@ -36,6 +36,7 @@ import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Collections;
+import static org.apache.paimon.CoreOptions.SINK_PROCESS_TIME_ZONE;
import static org.apache.paimon.CoreOptions.SINK_WATERMARK_TIME_ZONE;
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX;
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN;
@@ -519,6 +520,50 @@ public class TagAutoManagerTest extends
PrimaryKeyTableTestBase {
assertThat(tagManager.allTagNames()).containsOnly("20230718");
}
+ @Test
+ public void testForceCreatingSnapshotProcessTime() throws Exception {
+ // sink.process-time-zone=UTC, machine timezone=Asia/Shanghai.
+ // Daily tag should trigger at UTC 00:00 (Shanghai 08:00), not
Shanghai 00:00.
+
+ Options options = new Options();
+ options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.PROCESS_TIME);
+ options.set(TAG_CREATION_PERIOD, TagCreationPeriod.DAILY);
+ options.set(SINK_PROCESS_TIME_ZONE, "UTC");
+
+ FileStoreTable table = this.table.copy(options.toMap());
+
+ // Commit a snapshot to set nextTag
+ TableCommitImpl commit =
table.newCommit(commitUser).ignoreEmptyCommit(false);
+ commit.commit(new ManifestCommittable(0));
+ commit.close();
+
+ TagAutoCreation tagAutoCreation =
+ TagAutoCreation.create(
+ table.coreOptions(),
+ table.snapshotManager(),
+ table.store().newTagManager(),
+ table.store().newTagDeletion(),
+ Collections.emptyList());
+
+ // threshold = tagTime + 2 days (nextTag + 1 period)
+ TagManager tagManager = table.store().newTagManager();
+ String createdTag = tagManager.allTagNames().get(0);
+ LocalDateTime tagTime = LocalDateTime.parse(createdTag + "T00:00:00");
+ LocalDateTime thresholdUtc = tagTime.plusDays(2);
+
+ // Shanghai 00:00 = UTC 16:00 previous day, before threshold -> false
+ LocalDateTime shanghaiMidnightAsUtc = thresholdUtc.minusHours(8);
+
assertThat(tagAutoCreation.forceCreatingSnapshotProcessTime(shanghaiMidnightAsUtc))
+ .isFalse();
+
+ // UTC 00:00 = threshold -> true
+
assertThat(tagAutoCreation.forceCreatingSnapshotProcessTime(thresholdUtc)).isTrue();
+
+ // After threshold -> true
+
assertThat(tagAutoCreation.forceCreatingSnapshotProcessTime(thresholdUtc.plusHours(1)))
+ .isTrue();
+ }
+
private long localZoneMills(String timestamp) {
return LocalDateTime.parse(timestamp)
.atZone(ZoneId.systemDefault())