This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 1b0e605  feat(datafusion): upgrade to DataFusion 53 and use VERSION AS 
OF (#236)
1b0e605 is described below

commit 1b0e60589eab44c50b5e152bd98d85a3525c086d
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Apr 13 13:04:38 2026 +0800

    feat(datafusion): upgrade to DataFusion 53 and use VERSION AS OF (#236)
    
    Upgrade DataFusion from 52.3 to 53.0 (arrow/parquet 57→58, sqlparser 
0.59→0.61,
    orc-rust 0.7→0.8, pyo3 0.26→0.28) and replace the old `FOR SYSTEM_TIME AS 
OF`
    time travel syntax with the new `VERSION AS OF` and `TIMESTAMP AS OF` syntax
    supported by sqlparser 0.61.
    
    Introduce `scan.version` option to unify snapshot id and tag name 
resolution:
    at scan time, the version value is resolved by first checking if a tag with
    that name exists, then trying to parse it as a snapshot id, otherwise 
returning
    an error. Remove the now-redundant `scan.snapshot-id` and `scan.tag-name` 
options.
---
 Cargo.toml                                         |  20 ++--
 bindings/python/Cargo.toml                         |   2 +-
 bindings/python/pyproject.toml                     |   2 +-
 bindings/python/src/context.rs                     |  21 +----
 crates/integration_tests/tests/read_tables.rs      |  32 +++----
 crates/integrations/datafusion/src/ddl.rs          |  26 ++---
 .../datafusion/src/physical_plan/scan.rs           |  12 +--
 .../datafusion/src/relation_planner.rs             |  90 +++++++++---------
 .../integrations/datafusion/tests/read_tables.rs   |  60 +++++++-----
 crates/paimon/Cargo.toml                           |   2 +-
 crates/paimon/src/spec/core_options.rs             | 105 +++++++--------------
 crates/paimon/src/table/table_scan.rs              |  38 ++++----
 crates/test_utils.rs                               |   2 +-
 docs/src/datafusion.md                             |  36 +++----
 14 files changed, 211 insertions(+), 237 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 83fa44b..6b16050 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -28,15 +28,15 @@ license = "Apache-2.0"
 rust-version = "1.86.0"
 
 [workspace.dependencies]
-arrow = "57.0"
-arrow-array = { version = "57.0", features = ["ffi"] }
-arrow-buffer = "57.0"
-arrow-schema = "57.0"
-arrow-cast = "57.0"
-arrow-ord = "57.0"
-arrow-select = "57.0"
-datafusion = "52.3.0"
-datafusion-ffi = "52.3.0"
-parquet = "57.0"
+arrow = "58.0"
+arrow-array = { version = "58.0", features = ["ffi"] }
+arrow-buffer = "58.0"
+arrow-schema = "58.0"
+arrow-cast = "58.0"
+arrow-ord = "58.0"
+arrow-select = "58.0"
+datafusion = "53.0.0"
+datafusion-ffi = "53.0.0"
+parquet = "58.0"
 tokio = "1.39.2"
 tokio-util = "0.7"
diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml
index 617c7f6..6ed2406 100644
--- a/bindings/python/Cargo.toml
+++ b/bindings/python/Cargo.toml
@@ -32,5 +32,5 @@ datafusion = { workspace = true }
 datafusion-ffi = { workspace = true }
 paimon = { path = "../../crates/paimon", features = ["storage-all"] }
 paimon-datafusion = { path = "../../crates/integrations/datafusion" }
-pyo3 = { version = "0.26", features = ["abi3-py310"] }
+pyo3 = { version = "0.28", features = ["abi3-py310"] }
 tokio = { workspace = true }
diff --git a/bindings/python/pyproject.toml b/bindings/python/pyproject.toml
index dd5b153..e5c589a 100644
--- a/bindings/python/pyproject.toml
+++ b/bindings/python/pyproject.toml
@@ -53,5 +53,5 @@ dev = [
     "maturin>=1.9.4,<2.0",
     "pytest>=8.0",
     "pyarrow>=17.0",
-    "datafusion==52.3.0",
+    "datafusion==53.0.0",
 ]
diff --git a/bindings/python/src/context.rs b/bindings/python/src/context.rs
index 6631ff6..2bc1e1d 100644
--- a/bindings/python/src/context.rs
+++ b/bindings/python/src/context.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use std::collections::HashMap;
-use std::ptr::NonNull;
 use std::sync::Arc;
 
 use datafusion::catalog::CatalogProvider;
@@ -24,7 +23,6 @@ use datafusion_ffi::catalog_provider::FFI_CatalogProvider;
 use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
 use paimon::{CatalogFactory, Options};
 use paimon_datafusion::PaimonCatalogProvider;
-use pyo3::exceptions::PyValueError;
 use pyo3::prelude::*;
 use pyo3::types::PyCapsule;
 
@@ -52,23 +50,8 @@ fn ffi_logical_codec_from_pycapsule(obj: Bound<'_, PyAny>) 
-> PyResult<FFI_Logic
 
     let capsule = capsule.cast::<PyCapsule>()?;
     let expected_name = c"datafusion_logical_extension_codec";
-    match capsule.name()? {
-        Some(name) if name == expected_name => {}
-        Some(name) => {
-            return Err(PyValueError::new_err(format!(
-                "Expected capsule named {expected_name:?}, got {name:?}"
-            )));
-        }
-        None => {
-            return Err(PyValueError::new_err(format!(
-                "Expected capsule named {expected_name:?}, got unnamed capsule"
-            )));
-        }
-    }
-
-    let data = 
NonNull::new(capsule.pointer().cast::<FFI_LogicalExtensionCodec>())
-        .ok_or_else(|| PyValueError::new_err("Null logical extension codec 
capsule pointer"))?;
-    let codec = unsafe { data.as_ref() };
+    let ptr = capsule.pointer_checked(Some(expected_name))?;
+    let codec = unsafe { ptr.cast::<FFI_LogicalExtensionCodec>().as_ref() };
 
     Ok(codec.clone())
 }
diff --git a/crates/integration_tests/tests/read_tables.rs 
b/crates/integration_tests/tests/read_tables.rs
index 5e2ac48..9ed39cf 100644
--- a/crates/integration_tests/tests/read_tables.rs
+++ b/crates/integration_tests/tests/read_tables.rs
@@ -1699,7 +1699,7 @@ async fn test_time_travel_by_snapshot_id() {
 
     // Snapshot 1: (1, 'alice'), (2, 'bob')
     let table_snap1 = table.copy_with_options(HashMap::from([(
-        "scan.snapshot-id".to_string(),
+        "scan.version".to_string(),
         "1".to_string(),
     )]));
     let rb = table_snap1.new_read_builder();
@@ -1720,7 +1720,7 @@ async fn test_time_travel_by_snapshot_id() {
 
     // Snapshot 2: (1, 'alice'), (2, 'bob'), (3, 'carol'), (4, 'dave')
     let table_snap2 = table.copy_with_options(HashMap::from([(
-        "scan.snapshot-id".to_string(),
+        "scan.version".to_string(),
         "2".to_string(),
     )]));
     let rb2 = table_snap2.new_read_builder();
@@ -1753,7 +1753,7 @@ async fn test_time_travel_by_tag_name() {
 
     // Tag 'snapshot1' -> snapshot 1: (1, 'alice'), (2, 'bob')
     let table_tag1 = table.copy_with_options(HashMap::from([(
-        "scan.tag-name".to_string(),
+        "scan.version".to_string(),
         "snapshot1".to_string(),
     )]));
     let rb = table_tag1.new_read_builder();
@@ -1774,7 +1774,7 @@ async fn test_time_travel_by_tag_name() {
 
     // Tag 'snapshot2' -> snapshot 2: all 4 rows
     let table_tag2 = table.copy_with_options(HashMap::from([(
-        "scan.tag-name".to_string(),
+        "scan.version".to_string(),
         "snapshot2".to_string(),
     )]));
     let rb2 = table_tag2.new_read_builder();
@@ -1805,8 +1805,8 @@ async fn test_time_travel_conflicting_selectors_fail() {
     let table = get_table_from_catalog(&catalog, "time_travel_table").await;
 
     let conflicted = table.copy_with_options(HashMap::from([
-        ("scan.tag-name".to_string(), "snapshot1".to_string()),
-        ("scan.snapshot-id".to_string(), "2".to_string()),
+        ("scan.version".to_string(), "snapshot1".to_string()),
+        ("scan.timestamp-millis".to_string(), "1234".to_string()),
     ]));
 
     let plan_err = conflicted
@@ -1823,12 +1823,12 @@ async fn test_time_travel_conflicting_selectors_fail() {
                 "unexpected conflict error: {message}"
             );
             assert!(
-                message.contains("scan.snapshot-id"),
-                "conflict error should mention scan.snapshot-id: {message}"
+                message.contains("scan.version"),
+                "conflict error should mention scan.version: {message}"
             );
             assert!(
-                message.contains("scan.tag-name"),
-                "conflict error should mention scan.tag-name: {message}"
+                message.contains("scan.timestamp-millis"),
+                "conflict error should mention scan.timestamp-millis: 
{message}"
             );
         }
         other => panic!("unexpected error: {other:?}"),
@@ -1836,13 +1836,13 @@ async fn test_time_travel_conflicting_selectors_fail() {
 }
 
 #[tokio::test]
-async fn test_time_travel_invalid_numeric_selector_fails() {
+async fn test_time_travel_invalid_version_fails() {
     let catalog = create_file_system_catalog();
     let table = get_table_from_catalog(&catalog, "time_travel_table").await;
 
     let invalid = table.copy_with_options(HashMap::from([(
-        "scan.snapshot-id".to_string(),
-        "not-a-number".to_string(),
+        "scan.version".to_string(),
+        "nonexistent-tag".to_string(),
     )]));
 
     let plan_err = invalid
@@ -1850,13 +1850,13 @@ async fn 
test_time_travel_invalid_numeric_selector_fails() {
         .new_scan()
         .plan()
         .await
-        .expect_err("invalid numeric time-travel selector should fail");
+        .expect_err("invalid version should fail");
 
     match plan_err {
         Error::DataInvalid { message, .. } => {
             assert!(
-                message.contains("Invalid value for scan.snapshot-id"),
-                "unexpected invalid selector error: {message}"
+                message.contains("is not a valid tag name or snapshot id"),
+                "unexpected invalid version error: {message}"
             );
         }
         other => panic!("unexpected error: {other:?}"),
diff --git a/crates/integrations/datafusion/src/ddl.rs 
b/crates/integrations/datafusion/src/ddl.rs
index 97eb900..feace72 100644
--- a/crates/integrations/datafusion/src/ddl.rs
+++ b/crates/integrations/datafusion/src/ddl.rs
@@ -99,12 +99,14 @@ impl PaimonDdlHandler {
 
         match &statements[0] {
             Statement::CreateTable(create_table) => 
self.handle_create_table(create_table).await,
-            Statement::AlterTable {
-                name,
-                operations,
-                if_exists,
-                ..
-            } => self.handle_alter_table(name, operations, *if_exists).await,
+            Statement::AlterTable(alter_table) => {
+                self.handle_alter_table(
+                    &alter_table.name,
+                    &alter_table.operations,
+                    alter_table.if_exists,
+                )
+                .await
+            }
             _ => self.ctx.sql(sql).await,
         }
     }
@@ -146,12 +148,12 @@ impl PaimonDdlHandler {
 
         // Primary key from constraints: PRIMARY KEY (col, ...)
         for constraint in &ct.constraints {
-            if let 
datafusion::sql::sqlparser::ast::TableConstraint::PrimaryKey {
-                columns, ..
-            } = constraint
-            {
-                let pk_cols: Vec<String> =
-                    columns.iter().map(|c| 
c.column.expr.to_string()).collect();
+            if let 
datafusion::sql::sqlparser::ast::TableConstraint::PrimaryKey(pk) = constraint {
+                let pk_cols: Vec<String> = pk
+                    .columns
+                    .iter()
+                    .map(|c| c.column.expr.to_string())
+                    .collect();
                 builder = builder.primary_key(pk_cols);
             }
         }
diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs 
b/crates/integrations/datafusion/src/physical_plan/scan.rs
index a1c35a8..0b2d1f2 100644
--- a/crates/integrations/datafusion/src/physical_plan/scan.rs
+++ b/crates/integrations/datafusion/src/physical_plan/scan.rs
@@ -51,7 +51,7 @@ pub struct PaimonTableScan {
     /// Paimon splits that DataFusion partition `i` will read.
     /// Wrapped in `Arc` to avoid deep-cloning `DataSplit` metadata in 
`execute()`.
     planned_partitions: Vec<Arc<[DataSplit]>>,
-    plan_properties: PlanProperties,
+    plan_properties: Arc<PlanProperties>,
     /// Optional limit on the number of rows to return.
     limit: Option<usize>,
 }
@@ -65,12 +65,12 @@ impl PaimonTableScan {
         planned_partitions: Vec<Arc<[DataSplit]>>,
         limit: Option<usize>,
     ) -> Self {
-        let plan_properties = PlanProperties::new(
+        let plan_properties = Arc::new(PlanProperties::new(
             EquivalenceProperties::new(schema.clone()),
             Partitioning::UnknownPartitioning(planned_partitions.len()),
             EmissionType::Incremental,
             Boundedness::Bounded,
-        );
+        ));
         Self {
             table,
             projected_columns,
@@ -109,7 +109,7 @@ impl ExecutionPlan for PaimonTableScan {
         self
     }
 
-    fn properties(&self) -> &PlanProperties {
+    fn properties(&self) -> &Arc<PlanProperties> {
         &self.plan_properties
     }
 
@@ -168,10 +168,6 @@ impl ExecutionPlan for PaimonTableScan {
         )))
     }
 
-    fn statistics(&self) -> DFResult<Statistics> {
-        self.partition_statistics(None)
-    }
-
     fn partition_statistics(&self, partition: Option<usize>) -> 
DFResult<Statistics> {
         let partitions: &[Arc<[DataSplit]>] = match partition {
             Some(idx) => std::slice::from_ref(&self.planned_partitions[idx]),
diff --git a/crates/integrations/datafusion/src/relation_planner.rs 
b/crates/integrations/datafusion/src/relation_planner.rs
index 87198a4..80b4665 100644
--- a/crates/integrations/datafusion/src/relation_planner.rs
+++ b/crates/integrations/datafusion/src/relation_planner.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Custom [`RelationPlanner`] for Paimon time travel via `FOR SYSTEM_TIME AS 
OF`.
+//! Custom [`RelationPlanner`] for Paimon time travel via `VERSION AS OF` and 
`TIMESTAMP AS OF`.
 
 use std::collections::HashMap;
 use std::fmt::Debug;
@@ -29,16 +29,16 @@ use datafusion::logical_expr::planner::{
     PlannedRelation, RelationPlanner, RelationPlannerContext, RelationPlanning,
 };
 use datafusion::sql::sqlparser::ast::{self, TableFactor, TableVersion};
-use paimon::spec::{SCAN_SNAPSHOT_ID_OPTION, SCAN_TAG_NAME_OPTION, 
SCAN_TIMESTAMP_MILLIS_OPTION};
+use paimon::spec::{SCAN_TIMESTAMP_MILLIS_OPTION, SCAN_VERSION_OPTION};
 
 use crate::table::PaimonTableProvider;
 
-/// A [`RelationPlanner`] that intercepts `FOR SYSTEM_TIME AS OF` clauses
-/// on Paimon tables and resolves them to time travel options.
+/// A [`RelationPlanner`] that intercepts `VERSION AS OF` and `TIMESTAMP AS OF`
+/// clauses on Paimon tables and resolves them to time travel options.
 ///
-/// - Integer literal → sets `scan.snapshot-id` option on the table.
-/// - String literal (timestamp) → parsed as a timestamp, sets 
`scan.timestamp-millis` option.
-/// - String literal (other) → sets `scan.tag-name` option on the table.
+/// - `VERSION AS OF <integer or string>` → sets `scan.version` option on the 
table.
+///   At scan time, the version is resolved: tag name (if exists) → snapshot 
id → error.
+/// - `TIMESTAMP AS OF <timestamp string>` → parsed as a timestamp, sets 
`scan.timestamp-millis`.
 #[derive(Debug)]
 pub struct PaimonRelationPlanner;
 
@@ -67,12 +67,13 @@ impl RelationPlanner for PaimonRelationPlanner {
             ..
         } = relation
         else {
-            return Ok(RelationPlanning::Original(relation));
+            return Ok(RelationPlanning::Original(Box::new(relation)));
         };
 
-        let version_expr = match version {
-            Some(TableVersion::ForSystemTimeAsOf(expr)) => expr.clone(),
-            _ => return Ok(RelationPlanning::Original(relation)),
+        let extra_options = match version {
+            Some(TableVersion::VersionAsOf(expr)) => 
resolve_version_as_of(expr)?,
+            Some(TableVersion::TimestampAsOf(expr)) => 
resolve_timestamp_as_of(expr)?,
+            _ => return Ok(RelationPlanning::Original(Box::new(relation))),
         };
 
         // Resolve the table reference.
@@ -84,10 +85,9 @@ impl RelationPlanner for PaimonRelationPlanner {
 
         // Check if this is a Paimon table.
         let Some(paimon_provider) = 
provider.as_any().downcast_ref::<PaimonTableProvider>() else {
-            return Ok(RelationPlanning::Original(relation));
+            return Ok(RelationPlanning::Original(Box::new(relation)));
         };
 
-        let extra_options = resolve_time_travel_options(&version_expr)?;
         let new_table = 
paimon_provider.table().copy_with_options(extra_options);
         let new_provider = PaimonTableProvider::try_new(new_table)?;
         let new_source = provider_as_source(Arc::new(new_provider));
@@ -98,7 +98,9 @@ impl RelationPlanner for PaimonRelationPlanner {
         };
 
         let plan = LogicalPlanBuilder::scan(table_ref, new_source, 
None)?.build()?;
-        Ok(RelationPlanning::Planned(PlannedRelation::new(plan, alias)))
+        Ok(RelationPlanning::Planned(Box::new(PlannedRelation::new(
+            plan, alias,
+        ))))
     }
 }
 
@@ -136,45 +138,47 @@ fn object_name_to_table_reference(
     }
 }
 
-/// Resolve `FOR SYSTEM_TIME AS OF <expr>` into table options.
+/// Resolve `VERSION AS OF <expr>` into `scan.version` option.
 ///
-/// - Integer literal → `{"scan.snapshot-id": "N"}`
-/// - String literal (timestamp `YYYY-MM-DD HH:MM:SS`) → 
`{"scan.timestamp-millis": "M"}`
-/// - String literal (other) → `{"scan.tag-name": "S"}`
-fn resolve_time_travel_options(expr: &ast::Expr) -> DFResult<HashMap<String, 
String>> {
+/// The raw value (integer or string) is passed through as-is.
+/// Resolution (tag vs snapshot id) happens at scan time in `TableScan`.
+fn resolve_version_as_of(expr: &ast::Expr) -> DFResult<HashMap<String, 
String>> {
+    let version = match expr {
+        ast::Expr::Value(v) => match &v.value {
+            ast::Value::Number(n, _) => n.clone(),
+            ast::Value::SingleQuotedString(s) | 
ast::Value::DoubleQuotedString(s) => s.clone(),
+            _ => {
+                return Err(datafusion::error::DataFusionError::Plan(format!(
+                    "Unsupported VERSION AS OF expression: {expr}"
+                )))
+            }
+        },
+        _ => {
+            return Err(datafusion::error::DataFusionError::Plan(format!(
+                "Unsupported VERSION AS OF expression: {expr}. Expected an 
integer snapshot id or a tag name."
+            )))
+        }
+    };
+    Ok(HashMap::from([(SCAN_VERSION_OPTION.to_string(), version)]))
+}
+
+/// Resolve `TIMESTAMP AS OF <expr>` into `scan.timestamp-millis` option.
+fn resolve_timestamp_as_of(expr: &ast::Expr) -> DFResult<HashMap<String, 
String>> {
     match expr {
         ast::Expr::Value(v) => match &v.value {
-            ast::Value::Number(n, _) => {
-                // Validate it's a valid integer
-                n.parse::<i64>().map_err(|e| {
-                    datafusion::error::DataFusionError::Plan(format!(
-                        "Invalid snapshot id '{n}': {e}"
-                    ))
-                })?;
+            ast::Value::SingleQuotedString(s) | 
ast::Value::DoubleQuotedString(s) => {
+                let millis = parse_timestamp_to_millis(s)?;
                 Ok(HashMap::from([(
-                    SCAN_SNAPSHOT_ID_OPTION.to_string(),
-                    n.clone(),
+                    SCAN_TIMESTAMP_MILLIS_OPTION.to_string(),
+                    millis.to_string(),
                 )]))
             }
-            ast::Value::SingleQuotedString(s) | 
ast::Value::DoubleQuotedString(s) => {
-                // Try parsing as timestamp first; fall back to tag name.
-                match parse_timestamp_to_millis(s) {
-                    Ok(timestamp_millis) => Ok(HashMap::from([(
-                        SCAN_TIMESTAMP_MILLIS_OPTION.to_string(),
-                        timestamp_millis.to_string(),
-                    )])),
-                    Err(_) => Ok(HashMap::from([(
-                        SCAN_TAG_NAME_OPTION.to_string(),
-                        s.clone(),
-                    )])),
-                }
-            }
             _ => Err(datafusion::error::DataFusionError::Plan(format!(
-                "Unsupported time travel expression: {expr}"
+                "Unsupported TIMESTAMP AS OF expression: {expr}. Expected a 
timestamp string."
             ))),
         },
         _ => Err(datafusion::error::DataFusionError::Plan(format!(
-            "Unsupported time travel expression: {expr}. Expected an integer 
snapshot id, a timestamp string, or a tag name."
+            "Unsupported TIMESTAMP AS OF expression: {expr}. Expected a 
timestamp string."
         ))),
     }
 }
diff --git a/crates/integrations/datafusion/tests/read_tables.rs 
b/crates/integrations/datafusion/tests/read_tables.rs
index 4fc26d6..3b5df63 100644
--- a/crates/integrations/datafusion/tests/read_tables.rs
+++ b/crates/integrations/datafusion/tests/read_tables.rs
@@ -383,10 +383,10 @@ async fn test_missing_database_returns_no_schema() {
 // ======================= Time Travel Tests =======================
 
 /// Helper: create a SessionContext with catalog + relation planner for time 
travel.
-/// Uses BigQuery dialect to enable `FOR SYSTEM_TIME AS OF` syntax.
+/// Uses Databricks dialect to enable `VERSION AS OF` and `TIMESTAMP AS OF` 
syntax.
 async fn create_time_travel_context() -> SessionContext {
     let catalog = create_catalog();
-    let config = SessionConfig::new().set_str("datafusion.sql_parser.dialect", 
"BigQuery");
+    let config = SessionConfig::new().set_str("datafusion.sql_parser.dialect", 
"Databricks");
     let ctx = SessionContext::new_with_config(config);
     ctx.register_catalog(
         "paimon",
@@ -403,7 +403,7 @@ async fn test_time_travel_by_snapshot_id() {
 
     // Snapshot 1: should contain only the first insert (alice, bob)
     let batches = ctx
-        .sql("SELECT id, name FROM paimon.default.time_travel_table FOR 
SYSTEM_TIME AS OF 1")
+        .sql("SELECT id, name FROM paimon.default.time_travel_table VERSION AS 
OF 1")
         .await
         .expect("time travel query should parse")
         .collect()
@@ -420,7 +420,7 @@ async fn test_time_travel_by_snapshot_id() {
 
     // Snapshot 2 (latest): should contain all rows
     let batches = ctx
-        .sql("SELECT id, name FROM paimon.default.time_travel_table FOR 
SYSTEM_TIME AS OF 2")
+        .sql("SELECT id, name FROM paimon.default.time_travel_table VERSION AS 
OF 2")
         .await
         .expect("time travel query should parse")
         .collect()
@@ -443,11 +443,21 @@ async fn test_time_travel_by_snapshot_id() {
 
 #[tokio::test]
 async fn test_time_travel_by_tag_name() {
-    let ctx = create_time_travel_context().await;
+    // Tag-based time travel uses `scan.version` option directly since
+    // `VERSION AS OF` in SQL only accepts numeric values.
+    let provider = create_provider_with_options(
+        "time_travel_table",
+        HashMap::from([("scan.version".to_string(), "snapshot1".to_string())]),
+    )
+    .await;
+
+    let ctx = SessionContext::new();
+    ctx.register_table("time_travel_table", Arc::new(provider))
+        .expect("Failed to register table");
 
     // Tag 'snapshot1' points to snapshot 1: should contain only (alice, bob)
     let batches = ctx
-        .sql("SELECT id, name FROM paimon.default.time_travel_table FOR 
SYSTEM_TIME AS OF 'snapshot1'")
+        .sql("SELECT id, name FROM time_travel_table")
         .await
         .expect("tag time travel query should parse")
         .collect()
@@ -463,8 +473,18 @@ async fn test_time_travel_by_tag_name() {
     );
 
     // Tag 'snapshot2' points to snapshot 2: should contain all rows
-    let batches = ctx
-        .sql("SELECT id, name FROM paimon.default.time_travel_table FOR 
SYSTEM_TIME AS OF 'snapshot2'")
+    let provider2 = create_provider_with_options(
+        "time_travel_table",
+        HashMap::from([("scan.version".to_string(), "snapshot2".to_string())]),
+    )
+    .await;
+
+    let ctx2 = SessionContext::new();
+    ctx2.register_table("time_travel_table", Arc::new(provider2))
+        .expect("Failed to register table");
+
+    let batches = ctx2
+        .sql("SELECT id, name FROM time_travel_table")
         .await
         .expect("tag time travel query should parse")
         .collect()
@@ -489,11 +509,11 @@ async fn test_time_travel_by_tag_name() {
 async fn test_time_travel_conflicting_selectors_fail() {
     let provider = create_provider_with_options(
         "time_travel_table",
-        HashMap::from([("scan.tag-name".to_string(), 
"snapshot1".to_string())]),
+        HashMap::from([("scan.timestamp-millis".to_string(), 
"1234".to_string())]),
     )
     .await;
 
-    let config = SessionConfig::new().set_str("datafusion.sql_parser.dialect", 
"BigQuery");
+    let config = SessionConfig::new().set_str("datafusion.sql_parser.dialect", 
"Databricks");
     let ctx = SessionContext::new_with_config(config);
     ctx.register_table("time_travel_table", Arc::new(provider))
         .expect("Failed to register table");
@@ -501,7 +521,7 @@ async fn test_time_travel_conflicting_selectors_fail() {
         .expect("Failed to register relation planner");
 
     let err = ctx
-        .sql("SELECT id, name FROM time_travel_table FOR SYSTEM_TIME AS OF 2")
+        .sql("SELECT id, name FROM time_travel_table VERSION AS OF 2")
         .await
         .expect("time travel query should parse")
         .collect()
@@ -514,20 +534,16 @@ async fn test_time_travel_conflicting_selectors_fail() {
         "unexpected conflict error: {message}"
     );
     assert!(
-        message.contains("scan.snapshot-id"),
-        "conflict error should mention scan.snapshot-id: {message}"
-    );
-    assert!(
-        message.contains("scan.tag-name"),
-        "conflict error should mention scan.tag-name: {message}"
+        message.contains("scan.version"),
+        "conflict error should mention scan.version: {message}"
     );
 }
 
 #[tokio::test]
-async fn test_time_travel_invalid_numeric_selector_fails() {
+async fn test_time_travel_invalid_version_fails() {
     let provider = create_provider_with_options(
         "time_travel_table",
-        HashMap::from([("scan.snapshot-id".to_string(), 
"not-a-number".to_string())]),
+        HashMap::from([("scan.version".to_string(), 
"nonexistent-tag".to_string())]),
     )
     .await;
 
@@ -541,12 +557,12 @@ async fn 
test_time_travel_invalid_numeric_selector_fails() {
         .expect("query should parse")
         .collect()
         .await
-        .expect_err("invalid numeric time-travel selector should fail");
+        .expect_err("invalid version should fail");
 
     let message = err.to_string();
     assert!(
-        message.contains("Invalid value for scan.snapshot-id"),
-        "unexpected invalid selector error: {message}"
+        message.contains("is not a valid tag name or snapshot id"),
+        "unexpected invalid version error: {message}"
     );
 }
 
diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml
index 908781f..7477a3b 100644
--- a/crates/paimon/Cargo.toml
+++ b/crates/paimon/Cargo.toml
@@ -68,7 +68,7 @@ arrow-select = { workspace = true }
 futures = "0.3"
 tokio-util = { workspace = true, features = ["compat"] }
 parquet = { workspace = true, features = ["async", "zstd", "lz4", "snap"] }
-orc-rust = "0.7.0"
+orc-rust = "0.8.0"
 async-stream = "0.3.6"
 reqwest = { version = "0.12", features = ["json"] }
 # DLF authentication dependencies
diff --git a/crates/paimon/src/spec/core_options.rs 
b/crates/paimon/src/spec/core_options.rs
index bb52022..eb8da39 100644
--- a/crates/paimon/src/spec/core_options.rs
+++ b/crates/paimon/src/spec/core_options.rs
@@ -40,9 +40,8 @@ const DEFAULT_COMMIT_MAX_RETRIES: u32 = 10;
 const DEFAULT_COMMIT_TIMEOUT_MS: u64 = 120_000;
 const DEFAULT_COMMIT_MIN_RETRY_WAIT_MS: u64 = 1_000;
 const DEFAULT_COMMIT_MAX_RETRY_WAIT_MS: u64 = 10_000;
-pub const SCAN_SNAPSHOT_ID_OPTION: &str = "scan.snapshot-id";
 pub const SCAN_TIMESTAMP_MILLIS_OPTION: &str = "scan.timestamp-millis";
-pub const SCAN_TAG_NAME_OPTION: &str = "scan.tag-name";
+pub const SCAN_VERSION_OPTION: &str = "scan.version";
 const DEFAULT_SOURCE_SPLIT_TARGET_SIZE: i64 = 128 * 1024 * 1024;
 const DEFAULT_SOURCE_SPLIT_OPEN_FILE_COST: i64 = 4 * 1024 * 1024;
 const DEFAULT_PARTITION_DEFAULT_NAME: &str = "__DEFAULT_PARTITION__";
@@ -57,11 +56,12 @@ pub struct CoreOptions<'a> {
     options: &'a HashMap<String, String>,
 }
 
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+#[derive(Debug, Clone, PartialEq, Eq)]
 pub(crate) enum TimeTravelSelector<'a> {
-    TagName(&'a str),
-    SnapshotId(i64),
     TimestampMillis(i64),
+    /// Raw version string from `VERSION AS OF`. Resolved at scan time:
+    /// tag name (if tag exists) → snapshot id (if parseable as i64) → error.
+    Version(&'a str),
 }
 
 impl<'a> CoreOptions<'a> {
@@ -138,17 +138,6 @@ impl<'a> CoreOptions<'a> {
         }
     }
 
-    /// Raw snapshot id accessor for `scan.snapshot-id`.
-    ///
-    /// This compatibility accessor is lossy: it returns `None` for absent or
-    /// invalid values and does not validate selector conflicts. Internal
-    /// time-travel planning should use `try_time_travel_selector`.
-    pub fn scan_snapshot_id(&self) -> Option<i64> {
-        self.options
-            .get(SCAN_SNAPSHOT_ID_OPTION)
-            .and_then(|v| v.parse().ok())
-    }
-
     /// Raw timestamp accessor for `scan.timestamp-millis`.
     ///
     /// This compatibility accessor is lossy: it returns `None` for absent or
@@ -160,25 +149,14 @@ impl<'a> CoreOptions<'a> {
             .and_then(|v| v.parse().ok())
     }
 
-    /// Raw tag name accessor for `scan.tag-name`.
-    ///
-    /// This compatibility accessor does not validate selector conflicts.
-    /// Internal time-travel planning should use `try_time_travel_selector`.
-    pub fn scan_tag_name(&self) -> Option<&'a str> {
-        self.options.get(SCAN_TAG_NAME_OPTION).map(String::as_str)
-    }
-
     fn configured_time_travel_selectors(&self) -> Vec<&'static str> {
-        let mut selectors = Vec::with_capacity(3);
-        if self.options.contains_key(SCAN_TAG_NAME_OPTION) {
-            selectors.push(SCAN_TAG_NAME_OPTION);
-        }
-        if self.options.contains_key(SCAN_SNAPSHOT_ID_OPTION) {
-            selectors.push(SCAN_SNAPSHOT_ID_OPTION);
-        }
+        let mut selectors = Vec::with_capacity(2);
         if self.options.contains_key(SCAN_TIMESTAMP_MILLIS_OPTION) {
             selectors.push(SCAN_TIMESTAMP_MILLIS_OPTION);
         }
+        if self.options.contains_key(SCAN_VERSION_OPTION) {
+            selectors.push(SCAN_VERSION_OPTION);
+        }
         selectors
     }
 
@@ -198,12 +176,10 @@ impl<'a> CoreOptions<'a> {
             });
         }
 
-        if let Some(tag_name) = self.scan_tag_name() {
-            Ok(Some(TimeTravelSelector::TagName(tag_name)))
-        } else if let Some(id) = 
self.parse_i64_option(SCAN_SNAPSHOT_ID_OPTION)? {
-            Ok(Some(TimeTravelSelector::SnapshotId(id)))
-        } else if let Some(ts) = 
self.parse_i64_option(SCAN_TIMESTAMP_MILLIS_OPTION)? {
+        if let Some(ts) = self.parse_i64_option(SCAN_TIMESTAMP_MILLIS_OPTION)? 
{
             Ok(Some(TimeTravelSelector::TimestampMillis(ts)))
+        } else if let Some(version) = 
self.options.get(SCAN_VERSION_OPTION).map(String::as_str) {
+            Ok(Some(TimeTravelSelector::Version(version)))
         } else {
             Ok(None)
         }
@@ -408,8 +384,8 @@ mod tests {
     #[test]
     fn test_try_time_travel_selector_rejects_conflicting_selectors() {
         let options = HashMap::from([
-            (SCAN_TAG_NAME_OPTION.to_string(), "tag1".to_string()),
-            (SCAN_SNAPSHOT_ID_OPTION.to_string(), "7".to_string()),
+            (SCAN_VERSION_OPTION.to_string(), "tag1".to_string()),
+            (SCAN_TIMESTAMP_MILLIS_OPTION.to_string(), "1234".to_string()),
         ]);
         let core = CoreOptions::new(&options);
 
@@ -419,8 +395,8 @@ mod tests {
         match err {
             crate::Error::DataInvalid { message, .. } => {
                 assert!(message.contains("Only one time-travel selector may be 
set"));
-                assert!(message.contains(SCAN_TAG_NAME_OPTION));
-                assert!(message.contains(SCAN_SNAPSHOT_ID_OPTION));
+                assert!(message.contains(SCAN_VERSION_OPTION));
+                assert!(message.contains(SCAN_TIMESTAMP_MILLIS_OPTION));
             }
             other => panic!("unexpected error: {other:?}"),
         }
@@ -428,20 +404,6 @@ mod tests {
 
     #[test]
     fn test_try_time_travel_selector_rejects_invalid_numeric_values() {
-        let snapshot_options =
-            HashMap::from([(SCAN_SNAPSHOT_ID_OPTION.to_string(), 
"abc".to_string())]);
-        let snapshot_core = CoreOptions::new(&snapshot_options);
-
-        let snapshot_err = snapshot_core
-            .try_time_travel_selector()
-            .expect_err("invalid snapshot id should fail");
-        match snapshot_err {
-            crate::Error::DataInvalid { message, .. } => {
-                assert!(message.contains(SCAN_SNAPSHOT_ID_OPTION));
-            }
-            other => panic!("unexpected error: {other:?}"),
-        }
-
         let timestamp_options =
             HashMap::from([(SCAN_TIMESTAMP_MILLIS_OPTION.to_string(), 
"xyz".to_string())]);
         let timestamp_core = CoreOptions::new(&timestamp_options);
@@ -490,31 +452,34 @@ mod tests {
 
     #[test]
     fn test_try_time_travel_selector_normalizes_valid_selector() {
-        let tag_options = HashMap::from([(SCAN_TAG_NAME_OPTION.to_string(), 
"tag1".to_string())]);
-        let tag_core = CoreOptions::new(&tag_options);
+        let timestamp_options =
+            HashMap::from([(SCAN_TIMESTAMP_MILLIS_OPTION.to_string(), 
"1234".to_string())]);
+        let timestamp_core = CoreOptions::new(&timestamp_options);
         assert_eq!(
-            tag_core.try_time_travel_selector().expect("tag selector"),
-            Some(TimeTravelSelector::TagName("tag1"))
+            timestamp_core
+                .try_time_travel_selector()
+                .expect("timestamp selector"),
+            Some(TimeTravelSelector::TimestampMillis(1234))
         );
 
-        let snapshot_options =
-            HashMap::from([(SCAN_SNAPSHOT_ID_OPTION.to_string(), 
"7".to_string())]);
-        let snapshot_core = CoreOptions::new(&snapshot_options);
+        let version_options =
+            HashMap::from([(SCAN_VERSION_OPTION.to_string(), 
"my-tag".to_string())]);
+        let version_core = CoreOptions::new(&version_options);
         assert_eq!(
-            snapshot_core
+            version_core
                 .try_time_travel_selector()
-                .expect("snapshot selector"),
-            Some(TimeTravelSelector::SnapshotId(7))
+                .expect("version selector"),
+            Some(TimeTravelSelector::Version("my-tag"))
         );
 
-        let timestamp_options =
-            HashMap::from([(SCAN_TIMESTAMP_MILLIS_OPTION.to_string(), 
"1234".to_string())]);
-        let timestamp_core = CoreOptions::new(&timestamp_options);
+        let version_num_options =
+            HashMap::from([(SCAN_VERSION_OPTION.to_string(), 
"3".to_string())]);
+        let version_num_core = CoreOptions::new(&version_num_options);
         assert_eq!(
-            timestamp_core
+            version_num_core
                 .try_time_travel_selector()
-                .expect("timestamp selector"),
-            Some(TimeTravelSelector::TimestampMillis(1234))
+                .expect("version numeric selector"),
+            Some(TimeTravelSelector::Version("3"))
         );
     }
 
diff --git a/crates/paimon/src/table/table_scan.rs 
b/crates/paimon/src/table/table_scan.rs
index 1d2f621..a21b214 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -345,9 +345,8 @@ impl<'a> TableScan<'a> {
     /// Plan the full scan: resolve snapshot (via options or latest), then 
read manifests and build DataSplits.
     ///
     /// Time travel is resolved from table options:
-    /// - only one of `scan.tag-name`, `scan.snapshot-id`, 
`scan.timestamp-millis` may be set
-    /// - `scan.tag-name` → read the snapshot pinned by that tag
-    /// - `scan.snapshot-id` → read that specific snapshot
+    /// - only one of `scan.version`, `scan.timestamp-millis` may be set
+    /// - `scan.version` → tag name (if exists) → snapshot id (if parseable) → 
error
     /// - `scan.timestamp-millis` → find the latest snapshot <= that timestamp
     /// - otherwise → read the latest snapshot
     ///
@@ -367,19 +366,6 @@ impl<'a> TableScan<'a> {
         let core_options = CoreOptions::new(self.table.schema().options());
 
         match core_options.try_time_travel_selector()? {
-            Some(TimeTravelSelector::TagName(tag_name)) => {
-                let tag_manager = TagManager::new(file_io.clone(), 
table_path.to_string());
-                match tag_manager.get(tag_name).await? {
-                    Some(s) => Ok(Some(s)),
-                    None => Err(Error::DataInvalid {
-                        message: format!("Tag '{tag_name}' doesn't exist."),
-                        source: None,
-                    }),
-                }
-            }
-            Some(TimeTravelSelector::SnapshotId(id)) => {
-                snapshot_manager.get_snapshot(id).await.map(Some)
-            }
             Some(TimeTravelSelector::TimestampMillis(ts)) => {
                 match snapshot_manager.earlier_or_equal_time_mills(ts).await? {
                     Some(s) => Ok(Some(s)),
@@ -389,6 +375,26 @@ impl<'a> TableScan<'a> {
                     }),
                 }
             }
+            Some(TimeTravelSelector::Version(v)) => {
+                // Tag first, then snapshot id, else error.
+                let tag_manager = TagManager::new(file_io.clone(), 
table_path.to_string());
+                if tag_manager.tag_exists(v).await? {
+                    match tag_manager.get(v).await? {
+                        Some(s) => Ok(Some(s)),
+                        None => Err(Error::DataInvalid {
+                            message: format!("Tag '{v}' doesn't exist."),
+                            source: None,
+                        }),
+                    }
+                } else if let Ok(id) = v.parse::<i64>() {
+                    snapshot_manager.get_snapshot(id).await.map(Some)
+                } else {
+                    Err(Error::DataInvalid {
+                        message: format!("Version '{v}' is not a valid tag 
name or snapshot id."),
+                        source: None,
+                    })
+                }
+            }
             None => snapshot_manager.get_latest_snapshot().await,
         }
     }
diff --git a/crates/test_utils.rs b/crates/test_utils.rs
index cc88192..4813cc5 100644
--- a/crates/test_utils.rs
+++ b/crates/test_utils.rs
@@ -45,7 +45,7 @@ pub(crate) fn write_int_parquet_file(
 
     let props = max_row_group_size.map(|size| {
         WriterProperties::builder()
-            .set_max_row_group_size(size)
+            .set_max_row_group_row_count(Some(size))
             .build()
     });
     let file = File::create(path).unwrap();
diff --git a/docs/src/datafusion.md b/docs/src/datafusion.md
index 18930b6..df8c4b8 100644
--- a/docs/src/datafusion.md
+++ b/docs/src/datafusion.md
@@ -27,7 +27,7 @@ under the License.
 [dependencies]
 paimon = "0.1.0"
 paimon-datafusion = "0.1.0"
-datafusion = "52"
+datafusion = "53"
 tokio = { version = "1", features = ["full"] }
 ```
 
@@ -49,41 +49,43 @@ df.show().await?;
 
 ## Time Travel
 
-Paimon supports time travel queries to read historical data. In DataFusion, 
this is done via the `FOR SYSTEM_TIME AS OF` clause.
+Paimon supports time travel queries to read historical data. In DataFusion, 
this is done via the `VERSION AS OF` and `TIMESTAMP AS OF` clauses.
 
 ### By Snapshot ID
 
-Read data from a specific snapshot by passing an integer literal:
+Read data from a specific snapshot by passing an integer:
 
 ```sql
-SELECT * FROM paimon.default.my_table FOR SYSTEM_TIME AS OF 1
+SELECT * FROM paimon.default.my_table VERSION AS OF 1
 ```
 
-This sets the `scan.snapshot-id` option and reads exactly that snapshot.
+This sets the `scan.version` option. At scan time, the value is resolved as a 
snapshot ID.
 
-### By Timestamp
+### By Tag Name
 
-Read data as of a specific point in time by passing a timestamp string in 
`YYYY-MM-DD HH:MM:SS` format:
+Read data from a named tag. Since `VERSION AS OF` in SQL only accepts numeric 
values, tag-based time travel is done via the `scan.version` table option:
 
-```sql
-SELECT * FROM paimon.default.my_table FOR SYSTEM_TIME AS OF '2024-01-01 
00:00:00'
+```rust
+let table = table.copy_with_options(HashMap::from([
+    ("scan.version".to_string(), "my_tag".to_string()),
+]));
 ```
 
-This finds the latest snapshot whose commit time is less than or equal to the 
given timestamp. The timestamp is interpreted in the local timezone.
+At scan time, the version value is resolved by first checking if a tag with 
that name exists, then trying to parse it as a snapshot ID. If neither matches, 
an error is returned.
 
-### By Tag Name
+### By Timestamp
 
-Read data from a named tag by passing a string that is not a timestamp:
+Read data as of a specific point in time by passing a timestamp string in 
`YYYY-MM-DD HH:MM:SS` format:
 
 ```sql
-SELECT * FROM paimon.default.my_table FOR SYSTEM_TIME AS OF 'my_tag'
+SELECT * FROM paimon.default.my_table TIMESTAMP AS OF '2024-01-01 00:00:00'
 ```
 
-Tags are named snapshots created via Paimon's tag management (e.g., `CALL 
sys.create_tag(...)` in Spark). This is useful for pinning a stable version of 
the data for reproducible queries.
+This finds the latest snapshot whose commit time is less than or equal to the 
given timestamp. The timestamp is interpreted in the local timezone.
 
 ### Enabling Time Travel Syntax
 
-DataFusion requires the BigQuery SQL dialect to parse `FOR SYSTEM_TIME AS OF`. 
You also need to register the `PaimonRelationPlanner`:
+DataFusion requires the Databricks SQL dialect to parse `VERSION AS OF` and 
`TIMESTAMP AS OF`. You also need to register the `PaimonRelationPlanner`:
 
 ```rust
 use std::sync::Arc;
@@ -91,12 +93,12 @@ use datafusion::prelude::{SessionConfig, SessionContext};
 use paimon_datafusion::{PaimonCatalogProvider, PaimonRelationPlanner};
 
 let config = SessionConfig::new()
-    .set_str("datafusion.sql_parser.dialect", "BigQuery");
+    .set_str("datafusion.sql_parser.dialect", "Databricks");
 let ctx = SessionContext::new_with_config(config);
 
 ctx.register_catalog("paimon", 
Arc::new(PaimonCatalogProvider::new(Arc::new(catalog))));
 ctx.register_relation_planner(Arc::new(PaimonRelationPlanner::new()))?;
 
 // Now time travel queries work
-let df = ctx.sql("SELECT * FROM paimon.default.my_table FOR SYSTEM_TIME AS OF 
1").await?;
+let df = ctx.sql("SELECT * FROM paimon.default.my_table VERSION AS OF 
1").await?;
 ```


Reply via email to