This is an automated email from the ASF dual-hosted git repository. yuzelin pushed a commit to branch release-1.4 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit c25ddfa41561977bb1d9b63a4c2af288a1138749 Author: yuzelin <[email protected]> AuthorDate: Tue Apr 7 11:43:43 2026 +0800 [core] Fix TagAutoCreation.forceCreatingSnapshot to use SINK_PROCESS_TIME_ZONE (#7600) TagAutoCreation.forceCreatingSnapshot in the ProcessTimeExtractor branch uses LocalDateTime.now() (machine timezone) to determine whether to force creating a snapshot. When sink.process-time-zone is configured differently from the machine timezone (e.g. UTC on an Asia/Shanghai machine), the tag creation time is incorrect — it triggers at the machine's midnight instead of the configured timezone's midnight. --- .../org/apache/paimon/tag/TagAutoCreation.java | 19 ++++++--- .../org/apache/paimon/tag/TagAutoManagerTest.java | 45 ++++++++++++++++++++++ 2 files changed, 59 insertions(+), 5 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java index 08dfe25378..5f72763c70 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java @@ -20,6 +20,7 @@ package org.apache.paimon.tag; import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.operation.TagDeletion; import org.apache.paimon.table.sink.TagCallback; import org.apache.paimon.tag.TagTimeExtractor.ProcessTimeExtractor; @@ -60,6 +61,7 @@ public class TagAutoCreation { private final List<TagCallback> callbacks; private final Duration idlenessTimeout; private final boolean automaticCompletion; + private final ZoneId sinkProcessTimeZone; private LocalDateTime nextTag; private long nextSnapshot; @@ -75,7 +77,8 @@ public class TagAutoCreation { @Nullable Duration defaultTimeRetained, Duration idlenessTimeout, boolean automaticCompletion, - List<TagCallback> callbacks) { + List<TagCallback> callbacks, + ZoneId sinkProcessTimeZone) { this.snapshotManager = snapshotManager; this.tagManager = tagManager; this.tagDeletion = tagDeletion; @@ -87,6 +90,7 @@ public class TagAutoCreation { this.callbacks = callbacks; this.idlenessTimeout = idlenessTimeout; this.automaticCompletion = automaticCompletion; + this.sinkProcessTimeZone = sinkProcessTimeZone; this.periodHandler.validateDelay(delay); @@ -123,13 +127,17 @@ public class TagAutoCreation { return isAfterOrEqual(LocalDateTime.now().minus(idlenessTimeout), snapshotTime); } else if (timeExtractor instanceof ProcessTimeExtractor) { - return nextTag == null - || isAfterOrEqual( - LocalDateTime.now().minus(delay), periodHandler.nextTagTime(nextTag)); + return forceCreatingSnapshotProcessTime(LocalDateTime.now(sinkProcessTimeZone)); } return false; } + @VisibleForTesting + boolean forceCreatingSnapshotProcessTime(LocalDateTime now) { + return nextTag == null + || isAfterOrEqual(now.minus(delay), periodHandler.nextTagTime(nextTag)); + } + public void run() { while (true) { if (snapshotManager.snapshotExists(nextSnapshot)) { @@ -230,6 +238,7 @@ public class TagAutoCreation { options.tagDefaultTimeRetained(), options.snapshotWatermarkIdleTimeout(), options.tagAutomaticCompletion(), - callbacks); + callbacks, + options.sinkProcessTimeZone()); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java index c8954f7b31..ef0e1627ee 100644 --- a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java @@ -36,6 +36,7 @@ import java.time.LocalDateTime; import java.time.ZoneId; import java.util.Collections; +import static org.apache.paimon.CoreOptions.SINK_PROCESS_TIME_ZONE; import static org.apache.paimon.CoreOptions.SINK_WATERMARK_TIME_ZONE; import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX; import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN; @@ -519,6 +520,50 @@ public class TagAutoManagerTest extends PrimaryKeyTableTestBase { assertThat(tagManager.allTagNames()).containsOnly("20230718"); } + @Test + public void testForceCreatingSnapshotProcessTime() throws Exception { + // sink.process-time-zone=UTC, machine timezone=Asia/Shanghai. + // Daily tag should trigger at UTC 00:00 (Shanghai 08:00), not Shanghai 00:00. + + Options options = new Options(); + options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.PROCESS_TIME); + options.set(TAG_CREATION_PERIOD, TagCreationPeriod.DAILY); + options.set(SINK_PROCESS_TIME_ZONE, "UTC"); + + FileStoreTable table = this.table.copy(options.toMap()); + + // Commit a snapshot to set nextTag + TableCommitImpl commit = table.newCommit(commitUser).ignoreEmptyCommit(false); + commit.commit(new ManifestCommittable(0)); + commit.close(); + + TagAutoCreation tagAutoCreation = + TagAutoCreation.create( + table.coreOptions(), + table.snapshotManager(), + table.store().newTagManager(), + table.store().newTagDeletion(), + Collections.emptyList()); + + // threshold = tagTime + 2 days (nextTag + 1 period) + TagManager tagManager = table.store().newTagManager(); + String createdTag = tagManager.allTagNames().get(0); + LocalDateTime tagTime = LocalDateTime.parse(createdTag + "T00:00:00"); + LocalDateTime thresholdUtc = tagTime.plusDays(2); + + // Shanghai 00:00 = UTC 16:00 previous day, before threshold -> false + LocalDateTime shanghaiMidnightAsUtc = thresholdUtc.minusHours(8); + assertThat(tagAutoCreation.forceCreatingSnapshotProcessTime(shanghaiMidnightAsUtc)) + .isFalse(); + + // UTC 00:00 = threshold -> true + assertThat(tagAutoCreation.forceCreatingSnapshotProcessTime(thresholdUtc)).isTrue(); + + // After threshold -> true + assertThat(tagAutoCreation.forceCreatingSnapshotProcessTime(thresholdUtc.plusHours(1))) + .isTrue(); + } + private long localZoneMills(String timestamp) { return LocalDateTime.parse(timestamp) .atZone(ZoneId.systemDefault())
