This is an automated email from the ASF dual-hosted git repository.
hope pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 30d76b9 feat: support time travel by tag name and doc it (#199)
30d76b9 is described below
commit 30d76b9eb99bcf6c65cf0edee4e9a11d88620275
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Apr 3 19:54:33 2026 +0800
feat: support time travel by tag name and doc it (#199)
---
.../datafusion/src/relation_planner.rs | 26 ++++--
.../integrations/datafusion/tests/read_tables.rs | 44 +++++++++
crates/paimon/src/lib.rs | 2 +-
crates/paimon/src/spec/core_options.rs | 6 ++
crates/paimon/src/table/mod.rs | 2 +
crates/paimon/src/table/table_scan.rs | 14 ++-
crates/paimon/src/table/tag_manager.rs | 89 ++++++++++++++++++
dev/spark/provision.py | 6 ++
docs/mkdocs.yml | 1 +
docs/src/datafusion.md | 102 +++++++++++++++++++++
docs/src/getting-started.md | 33 -------
11 files changed, 281 insertions(+), 44 deletions(-)
diff --git a/crates/integrations/datafusion/src/relation_planner.rs
b/crates/integrations/datafusion/src/relation_planner.rs
index 1425a5b..87198a4 100644
--- a/crates/integrations/datafusion/src/relation_planner.rs
+++ b/crates/integrations/datafusion/src/relation_planner.rs
@@ -29,7 +29,7 @@ use datafusion::logical_expr::planner::{
PlannedRelation, RelationPlanner, RelationPlannerContext, RelationPlanning,
};
use datafusion::sql::sqlparser::ast::{self, TableFactor, TableVersion};
-use paimon::spec::{SCAN_SNAPSHOT_ID_OPTION, SCAN_TIMESTAMP_MILLIS_OPTION};
+use paimon::spec::{SCAN_SNAPSHOT_ID_OPTION, SCAN_TAG_NAME_OPTION,
SCAN_TIMESTAMP_MILLIS_OPTION};
use crate::table::PaimonTableProvider;
@@ -37,7 +37,8 @@ use crate::table::PaimonTableProvider;
/// on Paimon tables and resolves them to time travel options.
///
/// - Integer literal → sets `scan.snapshot-id` option on the table.
-/// - String literal → parsed as a timestamp, sets `scan.timestamp-millis`
option.
+/// - String literal (timestamp) → parsed as a timestamp, sets
`scan.timestamp-millis` option.
+/// - String literal (other) → sets `scan.tag-name` option on the table.
#[derive(Debug)]
pub struct PaimonRelationPlanner;
@@ -138,7 +139,8 @@ fn object_name_to_table_reference(
/// Resolve `FOR SYSTEM_TIME AS OF <expr>` into table options.
///
/// - Integer literal → `{"scan.snapshot-id": "N"}`
-/// - String literal (timestamp) → parse to millis →
`{"scan.timestamp-millis": "M"}`
+/// - String literal (timestamp `YYYY-MM-DD HH:MM:SS`) →
`{"scan.timestamp-millis": "M"}`
+/// - String literal (other) → `{"scan.tag-name": "S"}`
fn resolve_time_travel_options(expr: &ast::Expr) -> DFResult<HashMap<String,
String>> {
match expr {
ast::Expr::Value(v) => match &v.value {
@@ -155,18 +157,24 @@ fn resolve_time_travel_options(expr: &ast::Expr) ->
DFResult<HashMap<String, Str
)]))
}
ast::Value::SingleQuotedString(s) |
ast::Value::DoubleQuotedString(s) => {
- let timestamp_millis = parse_timestamp_to_millis(s)?;
- Ok(HashMap::from([(
- SCAN_TIMESTAMP_MILLIS_OPTION.to_string(),
- timestamp_millis.to_string(),
- )]))
+ // Try parsing as timestamp first; fall back to tag name.
+ match parse_timestamp_to_millis(s) {
+ Ok(timestamp_millis) => Ok(HashMap::from([(
+ SCAN_TIMESTAMP_MILLIS_OPTION.to_string(),
+ timestamp_millis.to_string(),
+ )])),
+ Err(_) => Ok(HashMap::from([(
+ SCAN_TAG_NAME_OPTION.to_string(),
+ s.clone(),
+ )])),
+ }
}
_ => Err(datafusion::error::DataFusionError::Plan(format!(
"Unsupported time travel expression: {expr}"
))),
},
_ => Err(datafusion::error::DataFusionError::Plan(format!(
- "Unsupported time travel expression: {expr}. Expected an integer
snapshot id or a timestamp string."
+ "Unsupported time travel expression: {expr}. Expected an integer
snapshot id, a timestamp string, or a tag name."
))),
}
}
diff --git a/crates/integrations/datafusion/tests/read_tables.rs
b/crates/integrations/datafusion/tests/read_tables.rs
index 8f831d7..6a3c995 100644
--- a/crates/integrations/datafusion/tests/read_tables.rs
+++ b/crates/integrations/datafusion/tests/read_tables.rs
@@ -424,3 +424,47 @@ async fn test_time_travel_by_snapshot_id() {
"Snapshot 2 should contain all rows"
);
}
+
+#[tokio::test]
+async fn test_time_travel_by_tag_name() {
+ let ctx = create_time_travel_context().await;
+
+ // Tag 'snapshot1' points to snapshot 1: should contain only (alice, bob)
+ let batches = ctx
+ .sql("SELECT id, name FROM paimon.default.time_travel_table FOR
SYSTEM_TIME AS OF 'snapshot1'")
+ .await
+ .expect("tag time travel query should parse")
+ .collect()
+ .await
+ .expect("tag time travel query should execute");
+
+ let mut rows = extract_id_name_rows(&batches);
+ rows.sort_by_key(|(id, _)| *id);
+ assert_eq!(
+ rows,
+ vec![(1, "alice".to_string()), (2, "bob".to_string())],
+ "Tag 'snapshot1' should contain only the first batch of rows"
+ );
+
+ // Tag 'snapshot2' points to snapshot 2: should contain all rows
+ let batches = ctx
+ .sql("SELECT id, name FROM paimon.default.time_travel_table FOR
SYSTEM_TIME AS OF 'snapshot2'")
+ .await
+ .expect("tag time travel query should parse")
+ .collect()
+ .await
+ .expect("tag time travel query should execute");
+
+ let mut rows = extract_id_name_rows(&batches);
+ rows.sort_by_key(|(id, _)| *id);
+ assert_eq!(
+ rows,
+ vec![
+ (1, "alice".to_string()),
+ (2, "bob".to_string()),
+ (3, "carol".to_string()),
+ (4, "dave".to_string()),
+ ],
+ "Tag 'snapshot2' should contain all rows"
+ );
+}
diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs
index 64a74a3..c5961dd 100644
--- a/crates/paimon/src/lib.rs
+++ b/crates/paimon/src/lib.rs
@@ -39,5 +39,5 @@ pub use catalog::FileSystemCatalog;
pub use table::{
DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan,
ReadBuilder, SnapshotManager,
- Table, TableRead, TableScan,
+ Table, TableRead, TableScan, TagManager,
};
diff --git a/crates/paimon/src/spec/core_options.rs
b/crates/paimon/src/spec/core_options.rs
index aeae23c..975f56a 100644
--- a/crates/paimon/src/spec/core_options.rs
+++ b/crates/paimon/src/spec/core_options.rs
@@ -25,6 +25,7 @@ const PARTITION_DEFAULT_NAME_OPTION: &str =
"partition.default-name";
const PARTITION_LEGACY_NAME_OPTION: &str = "partition.legacy-name";
pub const SCAN_SNAPSHOT_ID_OPTION: &str = "scan.snapshot-id";
pub const SCAN_TIMESTAMP_MILLIS_OPTION: &str = "scan.timestamp-millis";
+pub const SCAN_TAG_NAME_OPTION: &str = "scan.tag-name";
const DEFAULT_SOURCE_SPLIT_TARGET_SIZE: i64 = 128 * 1024 * 1024;
const DEFAULT_SOURCE_SPLIT_OPEN_FILE_COST: i64 = 4 * 1024 * 1024;
const DEFAULT_PARTITION_DEFAULT_NAME: &str = "__DEFAULT_PARTITION__";
@@ -104,6 +105,11 @@ impl<'a> CoreOptions<'a> {
.get(SCAN_TIMESTAMP_MILLIS_OPTION)
.and_then(|v| v.parse().ok())
}
+
+ /// Tag name for time travel via `scan.tag-name`.
+ pub fn scan_tag_name(&self) -> Option<&str> {
+ self.options.get(SCAN_TAG_NAME_OPTION).map(String::as_str)
+ }
}
/// Parse a memory size string to bytes using binary (1024-based) semantics.
diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs
index d936d7a..32f755a 100644
--- a/crates/paimon/src/table/mod.rs
+++ b/crates/paimon/src/table/mod.rs
@@ -22,6 +22,7 @@ mod read_builder;
mod snapshot_manager;
mod source;
mod table_scan;
+mod tag_manager;
use crate::Result;
use arrow_array::RecordBatch;
@@ -30,6 +31,7 @@ pub use read_builder::{ReadBuilder, TableRead};
pub use snapshot_manager::SnapshotManager;
pub use source::{DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket,
Plan};
pub use table_scan::TableScan;
+pub use tag_manager::TagManager;
use crate::catalog::Identifier;
use crate::io::FileIO;
diff --git a/crates/paimon/src/table/table_scan.rs
b/crates/paimon/src/table/table_scan.rs
index 1c1285d..60d32a5 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -29,6 +29,7 @@ use crate::spec::{
use crate::table::bin_pack::split_for_batch;
use crate::table::source::{DataSplit, DataSplitBuilder, DeletionFile,
PartitionBucket, Plan};
use crate::table::SnapshotManager;
+use crate::table::TagManager;
use crate::Error;
use std::collections::{HashMap, HashSet};
@@ -253,7 +254,18 @@ impl<'a> TableScan<'a> {
let snapshot_manager = SnapshotManager::new(file_io.clone(),
table_path.to_string());
let core_options = CoreOptions::new(self.table.schema().options());
- let snapshot = if let Some(id) = core_options.scan_snapshot_id() {
+ let snapshot = if let Some(tag_name) = core_options.scan_tag_name() {
+ let tag_manager = TagManager::new(file_io.clone(),
table_path.to_string());
+ match tag_manager.get(tag_name).await? {
+ Some(s) => s,
+ None => {
+ return Err(Error::DataInvalid {
+ message: format!("Tag '{tag_name}' doesn't exist."),
+ source: None,
+ })
+ }
+ }
+ } else if let Some(id) = core_options.scan_snapshot_id() {
snapshot_manager.get_snapshot(id).await?
} else if let Some(ts) = core_options.scan_timestamp_millis() {
match snapshot_manager.earlier_or_equal_time_mills(ts).await? {
diff --git a/crates/paimon/src/table/tag_manager.rs
b/crates/paimon/src/table/tag_manager.rs
new file mode 100644
index 0000000..a907a8a
--- /dev/null
+++ b/crates/paimon/src/table/tag_manager.rs
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tag manager for reading tag metadata using FileIO.
+//!
+//! Reference:
[org.apache.paimon.utils.TagManager](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java)
+//! and
[pypaimon.tag.tag_manager.TagManager](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/tag/tag_manager.py).
+
+use crate::io::FileIO;
+use crate::spec::Snapshot;
+
+const TAG_DIR: &str = "tag";
+const TAG_PREFIX: &str = "tag-";
+
+/// Manager for tag files using unified FileIO.
+///
+/// Tags are named snapshots stored as JSON files at
`{table_path}/tag/tag-{name}`.
+/// The tag file format is identical to a Snapshot JSON file.
+///
+/// Reference:
[org.apache.paimon.utils.TagManager](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java)
+#[derive(Debug, Clone)]
+pub struct TagManager {
+ file_io: FileIO,
+ table_path: String,
+}
+
+impl TagManager {
+ pub fn new(file_io: FileIO, table_path: String) -> Self {
+ Self {
+ file_io,
+ table_path,
+ }
+ }
+
+ /// Path to the tag directory (e.g. `table_path/tag`).
+ pub fn tag_directory(&self) -> String {
+ format!("{}/{}", self.table_path, TAG_DIR)
+ }
+
+ /// Path to the tag file for the given name (e.g. `tag/tag-my_tag`).
+ pub fn tag_path(&self, tag_name: &str) -> String {
+ format!("{}/{}{}", self.tag_directory(), TAG_PREFIX, tag_name)
+ }
+
+ /// Check if a tag exists.
+ pub async fn tag_exists(&self, tag_name: &str) -> crate::Result<bool> {
+ let path = self.tag_path(tag_name);
+ let input = self.file_io.new_input(&path)?;
+ input.exists().await
+ }
+
+ /// Get the snapshot for a tag, or None if the tag file does not exist.
+ ///
+ /// Tag files are JSON with the same schema as Snapshot.
+ /// Reads directly and catches NotFound to avoid a separate exists() IO
round-trip.
+ pub async fn get(&self, tag_name: &str) -> crate::Result<Option<Snapshot>>
{
+ let path = self.tag_path(tag_name);
+ let input = self.file_io.new_input(&path)?;
+ let bytes = match input.read().await {
+ Ok(b) => b,
+ Err(crate::Error::IoUnexpected { ref source, .. })
+ if source.kind() == opendal::ErrorKind::NotFound =>
+ {
+ return Ok(None);
+ }
+ Err(e) => return Err(e),
+ };
+ let snapshot: Snapshot =
+ serde_json::from_slice(&bytes).map_err(|e|
crate::Error::DataInvalid {
+ message: format!("tag '{tag_name}' JSON invalid: {e}"),
+ source: Some(Box::new(e)),
+ })?;
+ Ok(Some(snapshot))
+ }
+}
diff --git a/dev/spark/provision.py b/dev/spark/provision.py
index baa6a04..0a5e13c 100644
--- a/dev/spark/provision.py
+++ b/dev/spark/provision.py
@@ -286,6 +286,12 @@ def main():
"""
)
+ # Create tags for tag-based time travel tests
+ # Tag 'snapshot1' points to snapshot 1 (alice, bob)
+ # Tag 'snapshot2' points to snapshot 2 (alice, bob, carol, dave)
+ spark.sql("CALL sys.create_tag('default.time_travel_table', 'snapshot1',
1)")
+ spark.sql("CALL sys.create_tag('default.time_travel_table', 'snapshot2',
2)")
+
if __name__ == "__main__":
main()
diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml
index 4e59001..37d95e2 100644
--- a/docs/mkdocs.yml
+++ b/docs/mkdocs.yml
@@ -48,6 +48,7 @@ theme:
nav:
- Home: index.md
- Getting Started: getting-started.md
+ - DataFusion Integration: datafusion.md
- Architecture: architecture.md
- Releases: releases.md
- Contributing: contributing.md
diff --git a/docs/src/datafusion.md b/docs/src/datafusion.md
new file mode 100644
index 0000000..824d1f3
--- /dev/null
+++ b/docs/src/datafusion.md
@@ -0,0 +1,102 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# DataFusion Integration
+
+[Apache DataFusion](https://datafusion.apache.org/) is a fast, extensible
query engine for building data-centric systems in Rust. The `paimon-datafusion`
crate provides a read-only integration that lets you query Paimon tables using
SQL.
+
+## Setup
+
+```toml
+[dependencies]
+paimon = "0.0.0"
+paimon-datafusion = "0.0.0"
+datafusion = "52"
+tokio = { version = "1", features = ["full"] }
+```
+
+## Registering Tables
+
+Register an entire Paimon catalog so all databases and tables are accessible
via `catalog.database.table` syntax:
+
+```rust
+use std::sync::Arc;
+use datafusion::prelude::SessionContext;
+use paimon_datafusion::PaimonCatalogProvider;
+
+let ctx = SessionContext::new();
+ctx.register_catalog("paimon",
Arc::new(PaimonCatalogProvider::new(Arc::new(catalog))));
+
+let df = ctx.sql("SELECT * FROM paimon.default.my_table").await?;
+df.show().await?;
+```
+
+## Time Travel
+
+Paimon supports time travel queries to read historical data. In DataFusion,
this is done via the `FOR SYSTEM_TIME AS OF` clause.
+
+### By Snapshot ID
+
+Read data from a specific snapshot by passing an integer literal:
+
+```sql
+SELECT * FROM paimon.default.my_table FOR SYSTEM_TIME AS OF 1
+```
+
+This sets the `scan.snapshot-id` option and reads exactly that snapshot.
+
+### By Timestamp
+
+Read data as of a specific point in time by passing a timestamp string in
`YYYY-MM-DD HH:MM:SS` format:
+
+```sql
+SELECT * FROM paimon.default.my_table FOR SYSTEM_TIME AS OF '2024-01-01
00:00:00'
+```
+
+This finds the latest snapshot whose commit time is less than or equal to the
given timestamp. The timestamp is interpreted in the local timezone.
+
+### By Tag Name
+
+Read data from a named tag by passing a string that is not a timestamp:
+
+```sql
+SELECT * FROM paimon.default.my_table FOR SYSTEM_TIME AS OF 'my_tag'
+```
+
+Tags are named snapshots created via Paimon's tag management (e.g., `CALL
sys.create_tag(...)` in Spark). This is useful for pinning a stable version of
the data for reproducible queries.
+
+### Enabling Time Travel Syntax
+
+DataFusion requires the BigQuery SQL dialect to parse `FOR SYSTEM_TIME AS OF`.
You also need to register the `PaimonRelationPlanner`:
+
+```rust
+use std::sync::Arc;
+use datafusion::prelude::{SessionConfig, SessionContext};
+use paimon_datafusion::{PaimonCatalogProvider, PaimonRelationPlanner};
+
+let config = SessionConfig::new()
+ .set_str("datafusion.sql_parser.dialect", "BigQuery");
+let ctx = SessionContext::new_with_config(config);
+
+ctx.register_catalog("paimon",
Arc::new(PaimonCatalogProvider::new(Arc::new(catalog))));
+ctx.register_relation_planner(Arc::new(PaimonRelationPlanner::new()))?;
+
+// Now time travel queries work
+let df = ctx.sql("SELECT * FROM paimon.default.my_table FOR SYSTEM_TIME AS OF
1").await?;
+```
diff --git a/docs/src/getting-started.md b/docs/src/getting-started.md
index dac6af9..7763fea 100644
--- a/docs/src/getting-started.md
+++ b/docs/src/getting-started.md
@@ -161,39 +161,6 @@ while let Some(batch) = stream.next().await {
}
```
-## DataFusion Integration
-
-Query Paimon tables using SQL with [Apache
DataFusion](https://datafusion.apache.org/). Add the integration crate:
-
-```toml
-[dependencies]
-paimon = "0.0.0"
-paimon-datafusion = "0.0.0"
-datafusion = "52"
-```
-
-Register a Paimon table and run SQL queries:
-
-```rust
-use std::sync::Arc;
-use datafusion::prelude::SessionContext;
-use paimon_datafusion::PaimonTableProvider;
-
-// Get a Paimon table from your catalog
-let table = catalog.get_table(&identifier).await?;
-
-// Register as a DataFusion table
-let provider = PaimonTableProvider::try_new(table)?;
-let ctx = SessionContext::new();
-ctx.register_table("my_table", Arc::new(provider))?;
-
-// Query with SQL
-let df = ctx.sql("SELECT * FROM my_table").await?;
-df.show().await?;
-```
-
-> **Note:** The DataFusion integration supports full table scans and column
projection. Predicate pushdown is not yet implemented.
-
## Building from Source
```bash