This is an automated email from the ASF dual-hosted git repository.

hope pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 30d76b9  feat: support time travel by tag name and doc it (#199)
30d76b9 is described below

commit 30d76b9eb99bcf6c65cf0edee4e9a11d88620275
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Apr 3 19:54:33 2026 +0800

    feat: support time travel by tag name and doc it (#199)
---
 .../datafusion/src/relation_planner.rs             |  26 ++++--
 .../integrations/datafusion/tests/read_tables.rs   |  44 +++++++++
 crates/paimon/src/lib.rs                           |   2 +-
 crates/paimon/src/spec/core_options.rs             |   6 ++
 crates/paimon/src/table/mod.rs                     |   2 +
 crates/paimon/src/table/table_scan.rs              |  14 ++-
 crates/paimon/src/table/tag_manager.rs             |  89 ++++++++++++++++++
 dev/spark/provision.py                             |   6 ++
 docs/mkdocs.yml                                    |   1 +
 docs/src/datafusion.md                             | 102 +++++++++++++++++++++
 docs/src/getting-started.md                        |  33 -------
 11 files changed, 281 insertions(+), 44 deletions(-)

diff --git a/crates/integrations/datafusion/src/relation_planner.rs 
b/crates/integrations/datafusion/src/relation_planner.rs
index 1425a5b..87198a4 100644
--- a/crates/integrations/datafusion/src/relation_planner.rs
+++ b/crates/integrations/datafusion/src/relation_planner.rs
@@ -29,7 +29,7 @@ use datafusion::logical_expr::planner::{
     PlannedRelation, RelationPlanner, RelationPlannerContext, RelationPlanning,
 };
 use datafusion::sql::sqlparser::ast::{self, TableFactor, TableVersion};
-use paimon::spec::{SCAN_SNAPSHOT_ID_OPTION, SCAN_TIMESTAMP_MILLIS_OPTION};
+use paimon::spec::{SCAN_SNAPSHOT_ID_OPTION, SCAN_TAG_NAME_OPTION, 
SCAN_TIMESTAMP_MILLIS_OPTION};
 
 use crate::table::PaimonTableProvider;
 
@@ -37,7 +37,8 @@ use crate::table::PaimonTableProvider;
 /// on Paimon tables and resolves them to time travel options.
 ///
 /// - Integer literal → sets `scan.snapshot-id` option on the table.
-/// - String literal → parsed as a timestamp, sets `scan.timestamp-millis` 
option.
+/// - String literal (timestamp) → parsed as a timestamp, sets 
`scan.timestamp-millis` option.
+/// - String literal (other) → sets `scan.tag-name` option on the table.
 #[derive(Debug)]
 pub struct PaimonRelationPlanner;
 
@@ -138,7 +139,8 @@ fn object_name_to_table_reference(
 /// Resolve `FOR SYSTEM_TIME AS OF <expr>` into table options.
 ///
 /// - Integer literal → `{"scan.snapshot-id": "N"}`
-/// - String literal (timestamp) → parse to millis → 
`{"scan.timestamp-millis": "M"}`
+/// - String literal (timestamp `YYYY-MM-DD HH:MM:SS`) → 
`{"scan.timestamp-millis": "M"}`
+/// - String literal (other) → `{"scan.tag-name": "S"}`
 fn resolve_time_travel_options(expr: &ast::Expr) -> DFResult<HashMap<String, 
String>> {
     match expr {
         ast::Expr::Value(v) => match &v.value {
@@ -155,18 +157,24 @@ fn resolve_time_travel_options(expr: &ast::Expr) -> 
DFResult<HashMap<String, Str
                 )]))
             }
             ast::Value::SingleQuotedString(s) | 
ast::Value::DoubleQuotedString(s) => {
-                let timestamp_millis = parse_timestamp_to_millis(s)?;
-                Ok(HashMap::from([(
-                    SCAN_TIMESTAMP_MILLIS_OPTION.to_string(),
-                    timestamp_millis.to_string(),
-                )]))
+                // Try parsing as timestamp first; fall back to tag name.
+                match parse_timestamp_to_millis(s) {
+                    Ok(timestamp_millis) => Ok(HashMap::from([(
+                        SCAN_TIMESTAMP_MILLIS_OPTION.to_string(),
+                        timestamp_millis.to_string(),
+                    )])),
+                    Err(_) => Ok(HashMap::from([(
+                        SCAN_TAG_NAME_OPTION.to_string(),
+                        s.clone(),
+                    )])),
+                }
             }
             _ => Err(datafusion::error::DataFusionError::Plan(format!(
                 "Unsupported time travel expression: {expr}"
             ))),
         },
         _ => Err(datafusion::error::DataFusionError::Plan(format!(
-            "Unsupported time travel expression: {expr}. Expected an integer 
snapshot id or a timestamp string."
+            "Unsupported time travel expression: {expr}. Expected an integer 
snapshot id, a timestamp string, or a tag name."
         ))),
     }
 }
diff --git a/crates/integrations/datafusion/tests/read_tables.rs 
b/crates/integrations/datafusion/tests/read_tables.rs
index 8f831d7..6a3c995 100644
--- a/crates/integrations/datafusion/tests/read_tables.rs
+++ b/crates/integrations/datafusion/tests/read_tables.rs
@@ -424,3 +424,47 @@ async fn test_time_travel_by_snapshot_id() {
         "Snapshot 2 should contain all rows"
     );
 }
+
+#[tokio::test]
+async fn test_time_travel_by_tag_name() {
+    let ctx = create_time_travel_context().await;
+
+    // Tag 'snapshot1' points to snapshot 1: should contain only (alice, bob)
+    let batches = ctx
+        .sql("SELECT id, name FROM paimon.default.time_travel_table FOR 
SYSTEM_TIME AS OF 'snapshot1'")
+        .await
+        .expect("tag time travel query should parse")
+        .collect()
+        .await
+        .expect("tag time travel query should execute");
+
+    let mut rows = extract_id_name_rows(&batches);
+    rows.sort_by_key(|(id, _)| *id);
+    assert_eq!(
+        rows,
+        vec![(1, "alice".to_string()), (2, "bob".to_string())],
+        "Tag 'snapshot1' should contain only the first batch of rows"
+    );
+
+    // Tag 'snapshot2' points to snapshot 2: should contain all rows
+    let batches = ctx
+        .sql("SELECT id, name FROM paimon.default.time_travel_table FOR 
SYSTEM_TIME AS OF 'snapshot2'")
+        .await
+        .expect("tag time travel query should parse")
+        .collect()
+        .await
+        .expect("tag time travel query should execute");
+
+    let mut rows = extract_id_name_rows(&batches);
+    rows.sort_by_key(|(id, _)| *id);
+    assert_eq!(
+        rows,
+        vec![
+            (1, "alice".to_string()),
+            (2, "bob".to_string()),
+            (3, "carol".to_string()),
+            (4, "dave".to_string()),
+        ],
+        "Tag 'snapshot2' should contain all rows"
+    );
+}
diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs
index 64a74a3..c5961dd 100644
--- a/crates/paimon/src/lib.rs
+++ b/crates/paimon/src/lib.rs
@@ -39,5 +39,5 @@ pub use catalog::FileSystemCatalog;
 
 pub use table::{
     DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, 
ReadBuilder, SnapshotManager,
-    Table, TableRead, TableScan,
+    Table, TableRead, TableScan, TagManager,
 };
diff --git a/crates/paimon/src/spec/core_options.rs 
b/crates/paimon/src/spec/core_options.rs
index aeae23c..975f56a 100644
--- a/crates/paimon/src/spec/core_options.rs
+++ b/crates/paimon/src/spec/core_options.rs
@@ -25,6 +25,7 @@ const PARTITION_DEFAULT_NAME_OPTION: &str = 
"partition.default-name";
 const PARTITION_LEGACY_NAME_OPTION: &str = "partition.legacy-name";
 pub const SCAN_SNAPSHOT_ID_OPTION: &str = "scan.snapshot-id";
 pub const SCAN_TIMESTAMP_MILLIS_OPTION: &str = "scan.timestamp-millis";
+pub const SCAN_TAG_NAME_OPTION: &str = "scan.tag-name";
 const DEFAULT_SOURCE_SPLIT_TARGET_SIZE: i64 = 128 * 1024 * 1024;
 const DEFAULT_SOURCE_SPLIT_OPEN_FILE_COST: i64 = 4 * 1024 * 1024;
 const DEFAULT_PARTITION_DEFAULT_NAME: &str = "__DEFAULT_PARTITION__";
@@ -104,6 +105,11 @@ impl<'a> CoreOptions<'a> {
             .get(SCAN_TIMESTAMP_MILLIS_OPTION)
             .and_then(|v| v.parse().ok())
     }
+
+    /// Tag name for time travel via `scan.tag-name`.
+    pub fn scan_tag_name(&self) -> Option<&str> {
+        self.options.get(SCAN_TAG_NAME_OPTION).map(String::as_str)
+    }
 }
 
 /// Parse a memory size string to bytes using binary (1024-based) semantics.
diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs
index d936d7a..32f755a 100644
--- a/crates/paimon/src/table/mod.rs
+++ b/crates/paimon/src/table/mod.rs
@@ -22,6 +22,7 @@ mod read_builder;
 mod snapshot_manager;
 mod source;
 mod table_scan;
+mod tag_manager;
 
 use crate::Result;
 use arrow_array::RecordBatch;
@@ -30,6 +31,7 @@ pub use read_builder::{ReadBuilder, TableRead};
 pub use snapshot_manager::SnapshotManager;
 pub use source::{DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, 
Plan};
 pub use table_scan::TableScan;
+pub use tag_manager::TagManager;
 
 use crate::catalog::Identifier;
 use crate::io::FileIO;
diff --git a/crates/paimon/src/table/table_scan.rs 
b/crates/paimon/src/table/table_scan.rs
index 1c1285d..60d32a5 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -29,6 +29,7 @@ use crate::spec::{
 use crate::table::bin_pack::split_for_batch;
 use crate::table::source::{DataSplit, DataSplitBuilder, DeletionFile, 
PartitionBucket, Plan};
 use crate::table::SnapshotManager;
+use crate::table::TagManager;
 use crate::Error;
 use std::collections::{HashMap, HashSet};
 
@@ -253,7 +254,18 @@ impl<'a> TableScan<'a> {
         let snapshot_manager = SnapshotManager::new(file_io.clone(), 
table_path.to_string());
         let core_options = CoreOptions::new(self.table.schema().options());
 
-        let snapshot = if let Some(id) = core_options.scan_snapshot_id() {
+        let snapshot = if let Some(tag_name) = core_options.scan_tag_name() {
+            let tag_manager = TagManager::new(file_io.clone(), 
table_path.to_string());
+            match tag_manager.get(tag_name).await? {
+                Some(s) => s,
+                None => {
+                    return Err(Error::DataInvalid {
+                        message: format!("Tag '{tag_name}' doesn't exist."),
+                        source: None,
+                    })
+                }
+            }
+        } else if let Some(id) = core_options.scan_snapshot_id() {
             snapshot_manager.get_snapshot(id).await?
         } else if let Some(ts) = core_options.scan_timestamp_millis() {
             match snapshot_manager.earlier_or_equal_time_mills(ts).await? {
diff --git a/crates/paimon/src/table/tag_manager.rs 
b/crates/paimon/src/table/tag_manager.rs
new file mode 100644
index 0000000..a907a8a
--- /dev/null
+++ b/crates/paimon/src/table/tag_manager.rs
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tag manager for reading tag metadata using FileIO.
+//!
+//! Reference: 
[org.apache.paimon.utils.TagManager](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java)
+//! and 
[pypaimon.tag.tag_manager.TagManager](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/tag/tag_manager.py).
+
+use crate::io::FileIO;
+use crate::spec::Snapshot;
+
+const TAG_DIR: &str = "tag";
+const TAG_PREFIX: &str = "tag-";
+
+/// Manager for tag files using unified FileIO.
+///
+/// Tags are named snapshots stored as JSON files at 
`{table_path}/tag/tag-{name}`.
+/// The tag file format is identical to a Snapshot JSON file.
+///
+/// Reference: 
[org.apache.paimon.utils.TagManager](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java)
+#[derive(Debug, Clone)]
+pub struct TagManager {
+    file_io: FileIO,
+    table_path: String,
+}
+
+impl TagManager {
+    pub fn new(file_io: FileIO, table_path: String) -> Self {
+        Self {
+            file_io,
+            table_path,
+        }
+    }
+
+    /// Path to the tag directory (e.g. `table_path/tag`).
+    pub fn tag_directory(&self) -> String {
+        format!("{}/{}", self.table_path, TAG_DIR)
+    }
+
+    /// Path to the tag file for the given name (e.g. `tag/tag-my_tag`).
+    pub fn tag_path(&self, tag_name: &str) -> String {
+        format!("{}/{}{}", self.tag_directory(), TAG_PREFIX, tag_name)
+    }
+
+    /// Check if a tag exists.
+    pub async fn tag_exists(&self, tag_name: &str) -> crate::Result<bool> {
+        let path = self.tag_path(tag_name);
+        let input = self.file_io.new_input(&path)?;
+        input.exists().await
+    }
+
+    /// Get the snapshot for a tag, or None if the tag file does not exist.
+    ///
+    /// Tag files are JSON with the same schema as Snapshot.
+    /// Reads directly and catches NotFound to avoid a separate exists() IO 
round-trip.
+    pub async fn get(&self, tag_name: &str) -> crate::Result<Option<Snapshot>> 
{
+        let path = self.tag_path(tag_name);
+        let input = self.file_io.new_input(&path)?;
+        let bytes = match input.read().await {
+            Ok(b) => b,
+            Err(crate::Error::IoUnexpected { ref source, .. })
+                if source.kind() == opendal::ErrorKind::NotFound =>
+            {
+                return Ok(None);
+            }
+            Err(e) => return Err(e),
+        };
+        let snapshot: Snapshot =
+            serde_json::from_slice(&bytes).map_err(|e| 
crate::Error::DataInvalid {
+                message: format!("tag '{tag_name}' JSON invalid: {e}"),
+                source: Some(Box::new(e)),
+            })?;
+        Ok(Some(snapshot))
+    }
+}
diff --git a/dev/spark/provision.py b/dev/spark/provision.py
index baa6a04..0a5e13c 100644
--- a/dev/spark/provision.py
+++ b/dev/spark/provision.py
@@ -286,6 +286,12 @@ def main():
         """
     )
 
+    # Create tags for tag-based time travel tests
+    # Tag 'snapshot1' points to snapshot 1 (alice, bob)
+    # Tag 'snapshot2' points to snapshot 2 (alice, bob, carol, dave)
+    spark.sql("CALL sys.create_tag('default.time_travel_table', 'snapshot1', 
1)")
+    spark.sql("CALL sys.create_tag('default.time_travel_table', 'snapshot2', 
2)")
+
 
 if __name__ == "__main__":
     main()
diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml
index 4e59001..37d95e2 100644
--- a/docs/mkdocs.yml
+++ b/docs/mkdocs.yml
@@ -48,6 +48,7 @@ theme:
 nav:
   - Home: index.md
   - Getting Started: getting-started.md
+  - DataFusion Integration: datafusion.md
   - Architecture: architecture.md
   - Releases: releases.md
   - Contributing: contributing.md
diff --git a/docs/src/datafusion.md b/docs/src/datafusion.md
new file mode 100644
index 0000000..824d1f3
--- /dev/null
+++ b/docs/src/datafusion.md
@@ -0,0 +1,102 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# DataFusion Integration
+
+[Apache DataFusion](https://datafusion.apache.org/) is a fast, extensible 
query engine for building data-centric systems in Rust. The `paimon-datafusion` 
crate provides a read-only integration that lets you query Paimon tables using 
SQL.
+
+## Setup
+
+```toml
+[dependencies]
+paimon = "0.0.0"
+paimon-datafusion = "0.0.0"
+datafusion = "52"
+tokio = { version = "1", features = ["full"] }
+```
+
+## Registering Tables
+
+Register an entire Paimon catalog so all databases and tables are accessible 
via `catalog.database.table` syntax:
+
+```rust
+use std::sync::Arc;
+use datafusion::prelude::SessionContext;
+use paimon_datafusion::PaimonCatalogProvider;
+
+let ctx = SessionContext::new();
+ctx.register_catalog("paimon", 
Arc::new(PaimonCatalogProvider::new(Arc::new(catalog))));
+
+let df = ctx.sql("SELECT * FROM paimon.default.my_table").await?;
+df.show().await?;
+```
+
+## Time Travel
+
+Paimon supports time travel queries to read historical data. In DataFusion, 
this is done via the `FOR SYSTEM_TIME AS OF` clause.
+
+### By Snapshot ID
+
+Read data from a specific snapshot by passing an integer literal:
+
+```sql
+SELECT * FROM paimon.default.my_table FOR SYSTEM_TIME AS OF 1
+```
+
+This sets the `scan.snapshot-id` option and reads exactly that snapshot.
+
+### By Timestamp
+
+Read data as of a specific point in time by passing a timestamp string in 
`YYYY-MM-DD HH:MM:SS` format:
+
+```sql
+SELECT * FROM paimon.default.my_table FOR SYSTEM_TIME AS OF '2024-01-01 
00:00:00'
+```
+
+This finds the latest snapshot whose commit time is less than or equal to the 
given timestamp. The timestamp is interpreted in the local timezone.
+
+### By Tag Name
+
+Read data from a named tag by passing a string that is not a timestamp:
+
+```sql
+SELECT * FROM paimon.default.my_table FOR SYSTEM_TIME AS OF 'my_tag'
+```
+
+Tags are named snapshots created via Paimon's tag management (e.g., `CALL 
sys.create_tag(...)` in Spark). This is useful for pinning a stable version of 
the data for reproducible queries.
+
+### Enabling Time Travel Syntax
+
+DataFusion requires the BigQuery SQL dialect to parse `FOR SYSTEM_TIME AS OF`. 
You also need to register the `PaimonRelationPlanner`:
+
+```rust
+use std::sync::Arc;
+use datafusion::prelude::{SessionConfig, SessionContext};
+use paimon_datafusion::{PaimonCatalogProvider, PaimonRelationPlanner};
+
+let config = SessionConfig::new()
+    .set_str("datafusion.sql_parser.dialect", "BigQuery");
+let ctx = SessionContext::new_with_config(config);
+
+ctx.register_catalog("paimon", 
Arc::new(PaimonCatalogProvider::new(Arc::new(catalog))));
+ctx.register_relation_planner(Arc::new(PaimonRelationPlanner::new()))?;
+
+// Now time travel queries work
+let df = ctx.sql("SELECT * FROM paimon.default.my_table FOR SYSTEM_TIME AS OF 
1").await?;
+```
diff --git a/docs/src/getting-started.md b/docs/src/getting-started.md
index dac6af9..7763fea 100644
--- a/docs/src/getting-started.md
+++ b/docs/src/getting-started.md
@@ -161,39 +161,6 @@ while let Some(batch) = stream.next().await {
 }
 ```
 
-## DataFusion Integration
-
-Query Paimon tables using SQL with [Apache 
DataFusion](https://datafusion.apache.org/). Add the integration crate:
-
-```toml
-[dependencies]
-paimon = "0.0.0"
-paimon-datafusion = "0.0.0"
-datafusion = "52"
-```
-
-Register a Paimon table and run SQL queries:
-
-```rust
-use std::sync::Arc;
-use datafusion::prelude::SessionContext;
-use paimon_datafusion::PaimonTableProvider;
-
-// Get a Paimon table from your catalog
-let table = catalog.get_table(&identifier).await?;
-
-// Register as a DataFusion table
-let provider = PaimonTableProvider::try_new(table)?;
-let ctx = SessionContext::new();
-ctx.register_table("my_table", Arc::new(provider))?;
-
-// Query with SQL
-let df = ctx.sql("SELECT * FROM my_table").await?;
-df.show().await?;
-```
-
-> **Note:** The DataFusion integration supports full table scans and column 
projection. Predicate pushdown is not yet implemented.
-
 ## Building from Source
 
 ```bash

Reply via email to