This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 1b0e605 feat(datafusion): upgrade to DataFusion 53 and use VERSION AS
OF (#236)
1b0e605 is described below
commit 1b0e60589eab44c50b5e152bd98d85a3525c086d
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Apr 13 13:04:38 2026 +0800
feat(datafusion): upgrade to DataFusion 53 and use VERSION AS OF (#236)
Upgrade DataFusion from 52.3 to 53.0 (arrow/parquet 57→58, sqlparser
0.59→0.61,
orc-rust 0.7→0.8, pyo3 0.26→0.28) and replace the old `FOR SYSTEM_TIME AS
OF`
time travel syntax with the new `VERSION AS OF` and `TIMESTAMP AS OF` syntax
supported by sqlparser 0.61.
Introduce `scan.version` option to unify snapshot id and tag name
resolution:
at scan time, the version value is resolved by first checking if a tag with
that name exists, then trying to parse it as a snapshot id, otherwise
returning
an error. Remove the now-redundant `scan.snapshot-id` and `scan.tag-name`
options.
---
Cargo.toml | 20 ++--
bindings/python/Cargo.toml | 2 +-
bindings/python/pyproject.toml | 2 +-
bindings/python/src/context.rs | 21 +----
crates/integration_tests/tests/read_tables.rs | 32 +++----
crates/integrations/datafusion/src/ddl.rs | 26 ++---
.../datafusion/src/physical_plan/scan.rs | 12 +--
.../datafusion/src/relation_planner.rs | 90 +++++++++---------
.../integrations/datafusion/tests/read_tables.rs | 60 +++++++-----
crates/paimon/Cargo.toml | 2 +-
crates/paimon/src/spec/core_options.rs | 105 +++++++--------------
crates/paimon/src/table/table_scan.rs | 38 ++++----
crates/test_utils.rs | 2 +-
docs/src/datafusion.md | 36 +++----
14 files changed, 211 insertions(+), 237 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 83fa44b..6b16050 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -28,15 +28,15 @@ license = "Apache-2.0"
rust-version = "1.86.0"
[workspace.dependencies]
-arrow = "57.0"
-arrow-array = { version = "57.0", features = ["ffi"] }
-arrow-buffer = "57.0"
-arrow-schema = "57.0"
-arrow-cast = "57.0"
-arrow-ord = "57.0"
-arrow-select = "57.0"
-datafusion = "52.3.0"
-datafusion-ffi = "52.3.0"
-parquet = "57.0"
+arrow = "58.0"
+arrow-array = { version = "58.0", features = ["ffi"] }
+arrow-buffer = "58.0"
+arrow-schema = "58.0"
+arrow-cast = "58.0"
+arrow-ord = "58.0"
+arrow-select = "58.0"
+datafusion = "53.0.0"
+datafusion-ffi = "53.0.0"
+parquet = "58.0"
tokio = "1.39.2"
tokio-util = "0.7"
diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml
index 617c7f6..6ed2406 100644
--- a/bindings/python/Cargo.toml
+++ b/bindings/python/Cargo.toml
@@ -32,5 +32,5 @@ datafusion = { workspace = true }
datafusion-ffi = { workspace = true }
paimon = { path = "../../crates/paimon", features = ["storage-all"] }
paimon-datafusion = { path = "../../crates/integrations/datafusion" }
-pyo3 = { version = "0.26", features = ["abi3-py310"] }
+pyo3 = { version = "0.28", features = ["abi3-py310"] }
tokio = { workspace = true }
diff --git a/bindings/python/pyproject.toml b/bindings/python/pyproject.toml
index dd5b153..e5c589a 100644
--- a/bindings/python/pyproject.toml
+++ b/bindings/python/pyproject.toml
@@ -53,5 +53,5 @@ dev = [
"maturin>=1.9.4,<2.0",
"pytest>=8.0",
"pyarrow>=17.0",
- "datafusion==52.3.0",
+ "datafusion==53.0.0",
]
diff --git a/bindings/python/src/context.rs b/bindings/python/src/context.rs
index 6631ff6..2bc1e1d 100644
--- a/bindings/python/src/context.rs
+++ b/bindings/python/src/context.rs
@@ -16,7 +16,6 @@
// under the License.
use std::collections::HashMap;
-use std::ptr::NonNull;
use std::sync::Arc;
use datafusion::catalog::CatalogProvider;
@@ -24,7 +23,6 @@ use datafusion_ffi::catalog_provider::FFI_CatalogProvider;
use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
use paimon::{CatalogFactory, Options};
use paimon_datafusion::PaimonCatalogProvider;
-use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::types::PyCapsule;
@@ -52,23 +50,8 @@ fn ffi_logical_codec_from_pycapsule(obj: Bound<'_, PyAny>)
-> PyResult<FFI_Logic
let capsule = capsule.cast::<PyCapsule>()?;
let expected_name = c"datafusion_logical_extension_codec";
- match capsule.name()? {
- Some(name) if name == expected_name => {}
- Some(name) => {
- return Err(PyValueError::new_err(format!(
- "Expected capsule named {expected_name:?}, got {name:?}"
- )));
- }
- None => {
- return Err(PyValueError::new_err(format!(
- "Expected capsule named {expected_name:?}, got unnamed capsule"
- )));
- }
- }
-
- let data =
NonNull::new(capsule.pointer().cast::<FFI_LogicalExtensionCodec>())
- .ok_or_else(|| PyValueError::new_err("Null logical extension codec
capsule pointer"))?;
- let codec = unsafe { data.as_ref() };
+ let ptr = capsule.pointer_checked(Some(expected_name))?;
+ let codec = unsafe { ptr.cast::<FFI_LogicalExtensionCodec>().as_ref() };
Ok(codec.clone())
}
diff --git a/crates/integration_tests/tests/read_tables.rs
b/crates/integration_tests/tests/read_tables.rs
index 5e2ac48..9ed39cf 100644
--- a/crates/integration_tests/tests/read_tables.rs
+++ b/crates/integration_tests/tests/read_tables.rs
@@ -1699,7 +1699,7 @@ async fn test_time_travel_by_snapshot_id() {
// Snapshot 1: (1, 'alice'), (2, 'bob')
let table_snap1 = table.copy_with_options(HashMap::from([(
- "scan.snapshot-id".to_string(),
+ "scan.version".to_string(),
"1".to_string(),
)]));
let rb = table_snap1.new_read_builder();
@@ -1720,7 +1720,7 @@ async fn test_time_travel_by_snapshot_id() {
// Snapshot 2: (1, 'alice'), (2, 'bob'), (3, 'carol'), (4, 'dave')
let table_snap2 = table.copy_with_options(HashMap::from([(
- "scan.snapshot-id".to_string(),
+ "scan.version".to_string(),
"2".to_string(),
)]));
let rb2 = table_snap2.new_read_builder();
@@ -1753,7 +1753,7 @@ async fn test_time_travel_by_tag_name() {
// Tag 'snapshot1' -> snapshot 1: (1, 'alice'), (2, 'bob')
let table_tag1 = table.copy_with_options(HashMap::from([(
- "scan.tag-name".to_string(),
+ "scan.version".to_string(),
"snapshot1".to_string(),
)]));
let rb = table_tag1.new_read_builder();
@@ -1774,7 +1774,7 @@ async fn test_time_travel_by_tag_name() {
// Tag 'snapshot2' -> snapshot 2: all 4 rows
let table_tag2 = table.copy_with_options(HashMap::from([(
- "scan.tag-name".to_string(),
+ "scan.version".to_string(),
"snapshot2".to_string(),
)]));
let rb2 = table_tag2.new_read_builder();
@@ -1805,8 +1805,8 @@ async fn test_time_travel_conflicting_selectors_fail() {
let table = get_table_from_catalog(&catalog, "time_travel_table").await;
let conflicted = table.copy_with_options(HashMap::from([
- ("scan.tag-name".to_string(), "snapshot1".to_string()),
- ("scan.snapshot-id".to_string(), "2".to_string()),
+ ("scan.version".to_string(), "snapshot1".to_string()),
+ ("scan.timestamp-millis".to_string(), "1234".to_string()),
]));
let plan_err = conflicted
@@ -1823,12 +1823,12 @@ async fn test_time_travel_conflicting_selectors_fail() {
"unexpected conflict error: {message}"
);
assert!(
- message.contains("scan.snapshot-id"),
- "conflict error should mention scan.snapshot-id: {message}"
+ message.contains("scan.version"),
+ "conflict error should mention scan.version: {message}"
);
assert!(
- message.contains("scan.tag-name"),
- "conflict error should mention scan.tag-name: {message}"
+ message.contains("scan.timestamp-millis"),
+ "conflict error should mention scan.timestamp-millis:
{message}"
);
}
other => panic!("unexpected error: {other:?}"),
@@ -1836,13 +1836,13 @@ async fn test_time_travel_conflicting_selectors_fail() {
}
#[tokio::test]
-async fn test_time_travel_invalid_numeric_selector_fails() {
+async fn test_time_travel_invalid_version_fails() {
let catalog = create_file_system_catalog();
let table = get_table_from_catalog(&catalog, "time_travel_table").await;
let invalid = table.copy_with_options(HashMap::from([(
- "scan.snapshot-id".to_string(),
- "not-a-number".to_string(),
+ "scan.version".to_string(),
+ "nonexistent-tag".to_string(),
)]));
let plan_err = invalid
@@ -1850,13 +1850,13 @@ async fn
test_time_travel_invalid_numeric_selector_fails() {
.new_scan()
.plan()
.await
- .expect_err("invalid numeric time-travel selector should fail");
+ .expect_err("invalid version should fail");
match plan_err {
Error::DataInvalid { message, .. } => {
assert!(
- message.contains("Invalid value for scan.snapshot-id"),
- "unexpected invalid selector error: {message}"
+ message.contains("is not a valid tag name or snapshot id"),
+ "unexpected invalid version error: {message}"
);
}
other => panic!("unexpected error: {other:?}"),
diff --git a/crates/integrations/datafusion/src/ddl.rs
b/crates/integrations/datafusion/src/ddl.rs
index 97eb900..feace72 100644
--- a/crates/integrations/datafusion/src/ddl.rs
+++ b/crates/integrations/datafusion/src/ddl.rs
@@ -99,12 +99,14 @@ impl PaimonDdlHandler {
match &statements[0] {
Statement::CreateTable(create_table) =>
self.handle_create_table(create_table).await,
- Statement::AlterTable {
- name,
- operations,
- if_exists,
- ..
- } => self.handle_alter_table(name, operations, *if_exists).await,
+ Statement::AlterTable(alter_table) => {
+ self.handle_alter_table(
+ &alter_table.name,
+ &alter_table.operations,
+ alter_table.if_exists,
+ )
+ .await
+ }
_ => self.ctx.sql(sql).await,
}
}
@@ -146,12 +148,12 @@ impl PaimonDdlHandler {
// Primary key from constraints: PRIMARY KEY (col, ...)
for constraint in &ct.constraints {
- if let
datafusion::sql::sqlparser::ast::TableConstraint::PrimaryKey {
- columns, ..
- } = constraint
- {
- let pk_cols: Vec<String> =
- columns.iter().map(|c|
c.column.expr.to_string()).collect();
+ if let
datafusion::sql::sqlparser::ast::TableConstraint::PrimaryKey(pk) = constraint {
+ let pk_cols: Vec<String> = pk
+ .columns
+ .iter()
+ .map(|c| c.column.expr.to_string())
+ .collect();
builder = builder.primary_key(pk_cols);
}
}
diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs
b/crates/integrations/datafusion/src/physical_plan/scan.rs
index a1c35a8..0b2d1f2 100644
--- a/crates/integrations/datafusion/src/physical_plan/scan.rs
+++ b/crates/integrations/datafusion/src/physical_plan/scan.rs
@@ -51,7 +51,7 @@ pub struct PaimonTableScan {
/// Paimon splits that DataFusion partition `i` will read.
/// Wrapped in `Arc` to avoid deep-cloning `DataSplit` metadata in
`execute()`.
planned_partitions: Vec<Arc<[DataSplit]>>,
- plan_properties: PlanProperties,
+ plan_properties: Arc<PlanProperties>,
/// Optional limit on the number of rows to return.
limit: Option<usize>,
}
@@ -65,12 +65,12 @@ impl PaimonTableScan {
planned_partitions: Vec<Arc<[DataSplit]>>,
limit: Option<usize>,
) -> Self {
- let plan_properties = PlanProperties::new(
+ let plan_properties = Arc::new(PlanProperties::new(
EquivalenceProperties::new(schema.clone()),
Partitioning::UnknownPartitioning(planned_partitions.len()),
EmissionType::Incremental,
Boundedness::Bounded,
- );
+ ));
Self {
table,
projected_columns,
@@ -109,7 +109,7 @@ impl ExecutionPlan for PaimonTableScan {
self
}
- fn properties(&self) -> &PlanProperties {
+ fn properties(&self) -> &Arc<PlanProperties> {
&self.plan_properties
}
@@ -168,10 +168,6 @@ impl ExecutionPlan for PaimonTableScan {
)))
}
- fn statistics(&self) -> DFResult<Statistics> {
- self.partition_statistics(None)
- }
-
fn partition_statistics(&self, partition: Option<usize>) ->
DFResult<Statistics> {
let partitions: &[Arc<[DataSplit]>] = match partition {
Some(idx) => std::slice::from_ref(&self.planned_partitions[idx]),
diff --git a/crates/integrations/datafusion/src/relation_planner.rs
b/crates/integrations/datafusion/src/relation_planner.rs
index 87198a4..80b4665 100644
--- a/crates/integrations/datafusion/src/relation_planner.rs
+++ b/crates/integrations/datafusion/src/relation_planner.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-//! Custom [`RelationPlanner`] for Paimon time travel via `FOR SYSTEM_TIME AS
OF`.
+//! Custom [`RelationPlanner`] for Paimon time travel via `VERSION AS OF` and
`TIMESTAMP AS OF`.
use std::collections::HashMap;
use std::fmt::Debug;
@@ -29,16 +29,16 @@ use datafusion::logical_expr::planner::{
PlannedRelation, RelationPlanner, RelationPlannerContext, RelationPlanning,
};
use datafusion::sql::sqlparser::ast::{self, TableFactor, TableVersion};
-use paimon::spec::{SCAN_SNAPSHOT_ID_OPTION, SCAN_TAG_NAME_OPTION,
SCAN_TIMESTAMP_MILLIS_OPTION};
+use paimon::spec::{SCAN_TIMESTAMP_MILLIS_OPTION, SCAN_VERSION_OPTION};
use crate::table::PaimonTableProvider;
-/// A [`RelationPlanner`] that intercepts `FOR SYSTEM_TIME AS OF` clauses
-/// on Paimon tables and resolves them to time travel options.
+/// A [`RelationPlanner`] that intercepts `VERSION AS OF` and `TIMESTAMP AS OF`
+/// clauses on Paimon tables and resolves them to time travel options.
///
-/// - Integer literal → sets `scan.snapshot-id` option on the table.
-/// - String literal (timestamp) → parsed as a timestamp, sets
`scan.timestamp-millis` option.
-/// - String literal (other) → sets `scan.tag-name` option on the table.
+/// - `VERSION AS OF <integer or string>` → sets `scan.version` option on the
table.
+/// At scan time, the version is resolved: tag name (if exists) → snapshot
id → error.
+/// - `TIMESTAMP AS OF <timestamp string>` → parsed as a timestamp, sets
`scan.timestamp-millis`.
#[derive(Debug)]
pub struct PaimonRelationPlanner;
@@ -67,12 +67,13 @@ impl RelationPlanner for PaimonRelationPlanner {
..
} = relation
else {
- return Ok(RelationPlanning::Original(relation));
+ return Ok(RelationPlanning::Original(Box::new(relation)));
};
- let version_expr = match version {
- Some(TableVersion::ForSystemTimeAsOf(expr)) => expr.clone(),
- _ => return Ok(RelationPlanning::Original(relation)),
+ let extra_options = match version {
+ Some(TableVersion::VersionAsOf(expr)) =>
resolve_version_as_of(expr)?,
+ Some(TableVersion::TimestampAsOf(expr)) =>
resolve_timestamp_as_of(expr)?,
+ _ => return Ok(RelationPlanning::Original(Box::new(relation))),
};
// Resolve the table reference.
@@ -84,10 +85,9 @@ impl RelationPlanner for PaimonRelationPlanner {
// Check if this is a Paimon table.
let Some(paimon_provider) =
provider.as_any().downcast_ref::<PaimonTableProvider>() else {
- return Ok(RelationPlanning::Original(relation));
+ return Ok(RelationPlanning::Original(Box::new(relation)));
};
- let extra_options = resolve_time_travel_options(&version_expr)?;
let new_table =
paimon_provider.table().copy_with_options(extra_options);
let new_provider = PaimonTableProvider::try_new(new_table)?;
let new_source = provider_as_source(Arc::new(new_provider));
@@ -98,7 +98,9 @@ impl RelationPlanner for PaimonRelationPlanner {
};
let plan = LogicalPlanBuilder::scan(table_ref, new_source,
None)?.build()?;
- Ok(RelationPlanning::Planned(PlannedRelation::new(plan, alias)))
+ Ok(RelationPlanning::Planned(Box::new(PlannedRelation::new(
+ plan, alias,
+ ))))
}
}
@@ -136,45 +138,47 @@ fn object_name_to_table_reference(
}
}
-/// Resolve `FOR SYSTEM_TIME AS OF <expr>` into table options.
+/// Resolve `VERSION AS OF <expr>` into `scan.version` option.
///
-/// - Integer literal → `{"scan.snapshot-id": "N"}`
-/// - String literal (timestamp `YYYY-MM-DD HH:MM:SS`) →
`{"scan.timestamp-millis": "M"}`
-/// - String literal (other) → `{"scan.tag-name": "S"}`
-fn resolve_time_travel_options(expr: &ast::Expr) -> DFResult<HashMap<String,
String>> {
+/// The raw value (integer or string) is passed through as-is.
+/// Resolution (tag vs snapshot id) happens at scan time in `TableScan`.
+fn resolve_version_as_of(expr: &ast::Expr) -> DFResult<HashMap<String,
String>> {
+ let version = match expr {
+ ast::Expr::Value(v) => match &v.value {
+ ast::Value::Number(n, _) => n.clone(),
+ ast::Value::SingleQuotedString(s) |
ast::Value::DoubleQuotedString(s) => s.clone(),
+ _ => {
+ return Err(datafusion::error::DataFusionError::Plan(format!(
+ "Unsupported VERSION AS OF expression: {expr}"
+ )))
+ }
+ },
+ _ => {
+ return Err(datafusion::error::DataFusionError::Plan(format!(
+ "Unsupported VERSION AS OF expression: {expr}. Expected an
integer snapshot id or a tag name."
+ )))
+ }
+ };
+ Ok(HashMap::from([(SCAN_VERSION_OPTION.to_string(), version)]))
+}
+
+/// Resolve `TIMESTAMP AS OF <expr>` into `scan.timestamp-millis` option.
+fn resolve_timestamp_as_of(expr: &ast::Expr) -> DFResult<HashMap<String,
String>> {
match expr {
ast::Expr::Value(v) => match &v.value {
- ast::Value::Number(n, _) => {
- // Validate it's a valid integer
- n.parse::<i64>().map_err(|e| {
- datafusion::error::DataFusionError::Plan(format!(
- "Invalid snapshot id '{n}': {e}"
- ))
- })?;
+ ast::Value::SingleQuotedString(s) |
ast::Value::DoubleQuotedString(s) => {
+ let millis = parse_timestamp_to_millis(s)?;
Ok(HashMap::from([(
- SCAN_SNAPSHOT_ID_OPTION.to_string(),
- n.clone(),
+ SCAN_TIMESTAMP_MILLIS_OPTION.to_string(),
+ millis.to_string(),
)]))
}
- ast::Value::SingleQuotedString(s) |
ast::Value::DoubleQuotedString(s) => {
- // Try parsing as timestamp first; fall back to tag name.
- match parse_timestamp_to_millis(s) {
- Ok(timestamp_millis) => Ok(HashMap::from([(
- SCAN_TIMESTAMP_MILLIS_OPTION.to_string(),
- timestamp_millis.to_string(),
- )])),
- Err(_) => Ok(HashMap::from([(
- SCAN_TAG_NAME_OPTION.to_string(),
- s.clone(),
- )])),
- }
- }
_ => Err(datafusion::error::DataFusionError::Plan(format!(
- "Unsupported time travel expression: {expr}"
+ "Unsupported TIMESTAMP AS OF expression: {expr}. Expected a
timestamp string."
))),
},
_ => Err(datafusion::error::DataFusionError::Plan(format!(
- "Unsupported time travel expression: {expr}. Expected an integer
snapshot id, a timestamp string, or a tag name."
+ "Unsupported TIMESTAMP AS OF expression: {expr}. Expected a
timestamp string."
))),
}
}
diff --git a/crates/integrations/datafusion/tests/read_tables.rs
b/crates/integrations/datafusion/tests/read_tables.rs
index 4fc26d6..3b5df63 100644
--- a/crates/integrations/datafusion/tests/read_tables.rs
+++ b/crates/integrations/datafusion/tests/read_tables.rs
@@ -383,10 +383,10 @@ async fn test_missing_database_returns_no_schema() {
// ======================= Time Travel Tests =======================
/// Helper: create a SessionContext with catalog + relation planner for time
travel.
-/// Uses BigQuery dialect to enable `FOR SYSTEM_TIME AS OF` syntax.
+/// Uses Databricks dialect to enable `VERSION AS OF` and `TIMESTAMP AS OF`
syntax.
async fn create_time_travel_context() -> SessionContext {
let catalog = create_catalog();
- let config = SessionConfig::new().set_str("datafusion.sql_parser.dialect",
"BigQuery");
+ let config = SessionConfig::new().set_str("datafusion.sql_parser.dialect",
"Databricks");
let ctx = SessionContext::new_with_config(config);
ctx.register_catalog(
"paimon",
@@ -403,7 +403,7 @@ async fn test_time_travel_by_snapshot_id() {
// Snapshot 1: should contain only the first insert (alice, bob)
let batches = ctx
- .sql("SELECT id, name FROM paimon.default.time_travel_table FOR
SYSTEM_TIME AS OF 1")
+ .sql("SELECT id, name FROM paimon.default.time_travel_table VERSION AS
OF 1")
.await
.expect("time travel query should parse")
.collect()
@@ -420,7 +420,7 @@ async fn test_time_travel_by_snapshot_id() {
// Snapshot 2 (latest): should contain all rows
let batches = ctx
- .sql("SELECT id, name FROM paimon.default.time_travel_table FOR
SYSTEM_TIME AS OF 2")
+ .sql("SELECT id, name FROM paimon.default.time_travel_table VERSION AS
OF 2")
.await
.expect("time travel query should parse")
.collect()
@@ -443,11 +443,21 @@ async fn test_time_travel_by_snapshot_id() {
#[tokio::test]
async fn test_time_travel_by_tag_name() {
- let ctx = create_time_travel_context().await;
+ // Tag-based time travel uses `scan.version` option directly since
+ // `VERSION AS OF` in SQL only accepts numeric values.
+ let provider = create_provider_with_options(
+ "time_travel_table",
+ HashMap::from([("scan.version".to_string(), "snapshot1".to_string())]),
+ )
+ .await;
+
+ let ctx = SessionContext::new();
+ ctx.register_table("time_travel_table", Arc::new(provider))
+ .expect("Failed to register table");
// Tag 'snapshot1' points to snapshot 1: should contain only (alice, bob)
let batches = ctx
- .sql("SELECT id, name FROM paimon.default.time_travel_table FOR
SYSTEM_TIME AS OF 'snapshot1'")
+ .sql("SELECT id, name FROM time_travel_table")
.await
.expect("tag time travel query should parse")
.collect()
@@ -463,8 +473,18 @@ async fn test_time_travel_by_tag_name() {
);
// Tag 'snapshot2' points to snapshot 2: should contain all rows
- let batches = ctx
- .sql("SELECT id, name FROM paimon.default.time_travel_table FOR
SYSTEM_TIME AS OF 'snapshot2'")
+ let provider2 = create_provider_with_options(
+ "time_travel_table",
+ HashMap::from([("scan.version".to_string(), "snapshot2".to_string())]),
+ )
+ .await;
+
+ let ctx2 = SessionContext::new();
+ ctx2.register_table("time_travel_table", Arc::new(provider2))
+ .expect("Failed to register table");
+
+ let batches = ctx2
+ .sql("SELECT id, name FROM time_travel_table")
.await
.expect("tag time travel query should parse")
.collect()
@@ -489,11 +509,11 @@ async fn test_time_travel_by_tag_name() {
async fn test_time_travel_conflicting_selectors_fail() {
let provider = create_provider_with_options(
"time_travel_table",
- HashMap::from([("scan.tag-name".to_string(),
"snapshot1".to_string())]),
+ HashMap::from([("scan.timestamp-millis".to_string(),
"1234".to_string())]),
)
.await;
- let config = SessionConfig::new().set_str("datafusion.sql_parser.dialect",
"BigQuery");
+ let config = SessionConfig::new().set_str("datafusion.sql_parser.dialect",
"Databricks");
let ctx = SessionContext::new_with_config(config);
ctx.register_table("time_travel_table", Arc::new(provider))
.expect("Failed to register table");
@@ -501,7 +521,7 @@ async fn test_time_travel_conflicting_selectors_fail() {
.expect("Failed to register relation planner");
let err = ctx
- .sql("SELECT id, name FROM time_travel_table FOR SYSTEM_TIME AS OF 2")
+ .sql("SELECT id, name FROM time_travel_table VERSION AS OF 2")
.await
.expect("time travel query should parse")
.collect()
@@ -514,20 +534,16 @@ async fn test_time_travel_conflicting_selectors_fail() {
"unexpected conflict error: {message}"
);
assert!(
- message.contains("scan.snapshot-id"),
- "conflict error should mention scan.snapshot-id: {message}"
- );
- assert!(
- message.contains("scan.tag-name"),
- "conflict error should mention scan.tag-name: {message}"
+ message.contains("scan.version"),
+ "conflict error should mention scan.version: {message}"
);
}
#[tokio::test]
-async fn test_time_travel_invalid_numeric_selector_fails() {
+async fn test_time_travel_invalid_version_fails() {
let provider = create_provider_with_options(
"time_travel_table",
- HashMap::from([("scan.snapshot-id".to_string(),
"not-a-number".to_string())]),
+ HashMap::from([("scan.version".to_string(),
"nonexistent-tag".to_string())]),
)
.await;
@@ -541,12 +557,12 @@ async fn
test_time_travel_invalid_numeric_selector_fails() {
.expect("query should parse")
.collect()
.await
- .expect_err("invalid numeric time-travel selector should fail");
+ .expect_err("invalid version should fail");
let message = err.to_string();
assert!(
- message.contains("Invalid value for scan.snapshot-id"),
- "unexpected invalid selector error: {message}"
+ message.contains("is not a valid tag name or snapshot id"),
+ "unexpected invalid version error: {message}"
);
}
diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml
index 908781f..7477a3b 100644
--- a/crates/paimon/Cargo.toml
+++ b/crates/paimon/Cargo.toml
@@ -68,7 +68,7 @@ arrow-select = { workspace = true }
futures = "0.3"
tokio-util = { workspace = true, features = ["compat"] }
parquet = { workspace = true, features = ["async", "zstd", "lz4", "snap"] }
-orc-rust = "0.7.0"
+orc-rust = "0.8.0"
async-stream = "0.3.6"
reqwest = { version = "0.12", features = ["json"] }
# DLF authentication dependencies
diff --git a/crates/paimon/src/spec/core_options.rs
b/crates/paimon/src/spec/core_options.rs
index bb52022..eb8da39 100644
--- a/crates/paimon/src/spec/core_options.rs
+++ b/crates/paimon/src/spec/core_options.rs
@@ -40,9 +40,8 @@ const DEFAULT_COMMIT_MAX_RETRIES: u32 = 10;
const DEFAULT_COMMIT_TIMEOUT_MS: u64 = 120_000;
const DEFAULT_COMMIT_MIN_RETRY_WAIT_MS: u64 = 1_000;
const DEFAULT_COMMIT_MAX_RETRY_WAIT_MS: u64 = 10_000;
-pub const SCAN_SNAPSHOT_ID_OPTION: &str = "scan.snapshot-id";
pub const SCAN_TIMESTAMP_MILLIS_OPTION: &str = "scan.timestamp-millis";
-pub const SCAN_TAG_NAME_OPTION: &str = "scan.tag-name";
+pub const SCAN_VERSION_OPTION: &str = "scan.version";
const DEFAULT_SOURCE_SPLIT_TARGET_SIZE: i64 = 128 * 1024 * 1024;
const DEFAULT_SOURCE_SPLIT_OPEN_FILE_COST: i64 = 4 * 1024 * 1024;
const DEFAULT_PARTITION_DEFAULT_NAME: &str = "__DEFAULT_PARTITION__";
@@ -57,11 +56,12 @@ pub struct CoreOptions<'a> {
options: &'a HashMap<String, String>,
}
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum TimeTravelSelector<'a> {
- TagName(&'a str),
- SnapshotId(i64),
TimestampMillis(i64),
+ /// Raw version string from `VERSION AS OF`. Resolved at scan time:
+ /// tag name (if tag exists) → snapshot id (if parseable as i64) → error.
+ Version(&'a str),
}
impl<'a> CoreOptions<'a> {
@@ -138,17 +138,6 @@ impl<'a> CoreOptions<'a> {
}
}
- /// Raw snapshot id accessor for `scan.snapshot-id`.
- ///
- /// This compatibility accessor is lossy: it returns `None` for absent or
- /// invalid values and does not validate selector conflicts. Internal
- /// time-travel planning should use `try_time_travel_selector`.
- pub fn scan_snapshot_id(&self) -> Option<i64> {
- self.options
- .get(SCAN_SNAPSHOT_ID_OPTION)
- .and_then(|v| v.parse().ok())
- }
-
/// Raw timestamp accessor for `scan.timestamp-millis`.
///
/// This compatibility accessor is lossy: it returns `None` for absent or
@@ -160,25 +149,14 @@ impl<'a> CoreOptions<'a> {
.and_then(|v| v.parse().ok())
}
- /// Raw tag name accessor for `scan.tag-name`.
- ///
- /// This compatibility accessor does not validate selector conflicts.
- /// Internal time-travel planning should use `try_time_travel_selector`.
- pub fn scan_tag_name(&self) -> Option<&'a str> {
- self.options.get(SCAN_TAG_NAME_OPTION).map(String::as_str)
- }
-
fn configured_time_travel_selectors(&self) -> Vec<&'static str> {
- let mut selectors = Vec::with_capacity(3);
- if self.options.contains_key(SCAN_TAG_NAME_OPTION) {
- selectors.push(SCAN_TAG_NAME_OPTION);
- }
- if self.options.contains_key(SCAN_SNAPSHOT_ID_OPTION) {
- selectors.push(SCAN_SNAPSHOT_ID_OPTION);
- }
+ let mut selectors = Vec::with_capacity(2);
if self.options.contains_key(SCAN_TIMESTAMP_MILLIS_OPTION) {
selectors.push(SCAN_TIMESTAMP_MILLIS_OPTION);
}
+ if self.options.contains_key(SCAN_VERSION_OPTION) {
+ selectors.push(SCAN_VERSION_OPTION);
+ }
selectors
}
@@ -198,12 +176,10 @@ impl<'a> CoreOptions<'a> {
});
}
- if let Some(tag_name) = self.scan_tag_name() {
- Ok(Some(TimeTravelSelector::TagName(tag_name)))
- } else if let Some(id) =
self.parse_i64_option(SCAN_SNAPSHOT_ID_OPTION)? {
- Ok(Some(TimeTravelSelector::SnapshotId(id)))
- } else if let Some(ts) =
self.parse_i64_option(SCAN_TIMESTAMP_MILLIS_OPTION)? {
+ if let Some(ts) = self.parse_i64_option(SCAN_TIMESTAMP_MILLIS_OPTION)?
{
Ok(Some(TimeTravelSelector::TimestampMillis(ts)))
+ } else if let Some(version) =
self.options.get(SCAN_VERSION_OPTION).map(String::as_str) {
+ Ok(Some(TimeTravelSelector::Version(version)))
} else {
Ok(None)
}
@@ -408,8 +384,8 @@ mod tests {
#[test]
fn test_try_time_travel_selector_rejects_conflicting_selectors() {
let options = HashMap::from([
- (SCAN_TAG_NAME_OPTION.to_string(), "tag1".to_string()),
- (SCAN_SNAPSHOT_ID_OPTION.to_string(), "7".to_string()),
+ (SCAN_VERSION_OPTION.to_string(), "tag1".to_string()),
+ (SCAN_TIMESTAMP_MILLIS_OPTION.to_string(), "1234".to_string()),
]);
let core = CoreOptions::new(&options);
@@ -419,8 +395,8 @@ mod tests {
match err {
crate::Error::DataInvalid { message, .. } => {
assert!(message.contains("Only one time-travel selector may be
set"));
- assert!(message.contains(SCAN_TAG_NAME_OPTION));
- assert!(message.contains(SCAN_SNAPSHOT_ID_OPTION));
+ assert!(message.contains(SCAN_VERSION_OPTION));
+ assert!(message.contains(SCAN_TIMESTAMP_MILLIS_OPTION));
}
other => panic!("unexpected error: {other:?}"),
}
@@ -428,20 +404,6 @@ mod tests {
#[test]
fn test_try_time_travel_selector_rejects_invalid_numeric_values() {
- let snapshot_options =
- HashMap::from([(SCAN_SNAPSHOT_ID_OPTION.to_string(),
"abc".to_string())]);
- let snapshot_core = CoreOptions::new(&snapshot_options);
-
- let snapshot_err = snapshot_core
- .try_time_travel_selector()
- .expect_err("invalid snapshot id should fail");
- match snapshot_err {
- crate::Error::DataInvalid { message, .. } => {
- assert!(message.contains(SCAN_SNAPSHOT_ID_OPTION));
- }
- other => panic!("unexpected error: {other:?}"),
- }
-
let timestamp_options =
HashMap::from([(SCAN_TIMESTAMP_MILLIS_OPTION.to_string(),
"xyz".to_string())]);
let timestamp_core = CoreOptions::new(×tamp_options);
@@ -490,31 +452,34 @@ mod tests {
#[test]
fn test_try_time_travel_selector_normalizes_valid_selector() {
- let tag_options = HashMap::from([(SCAN_TAG_NAME_OPTION.to_string(),
"tag1".to_string())]);
- let tag_core = CoreOptions::new(&tag_options);
+ let timestamp_options =
+ HashMap::from([(SCAN_TIMESTAMP_MILLIS_OPTION.to_string(),
"1234".to_string())]);
+ let timestamp_core = CoreOptions::new(×tamp_options);
assert_eq!(
- tag_core.try_time_travel_selector().expect("tag selector"),
- Some(TimeTravelSelector::TagName("tag1"))
+ timestamp_core
+ .try_time_travel_selector()
+ .expect("timestamp selector"),
+ Some(TimeTravelSelector::TimestampMillis(1234))
);
- let snapshot_options =
- HashMap::from([(SCAN_SNAPSHOT_ID_OPTION.to_string(),
"7".to_string())]);
- let snapshot_core = CoreOptions::new(&snapshot_options);
+ let version_options =
+ HashMap::from([(SCAN_VERSION_OPTION.to_string(),
"my-tag".to_string())]);
+ let version_core = CoreOptions::new(&version_options);
assert_eq!(
- snapshot_core
+ version_core
.try_time_travel_selector()
- .expect("snapshot selector"),
- Some(TimeTravelSelector::SnapshotId(7))
+ .expect("version selector"),
+ Some(TimeTravelSelector::Version("my-tag"))
);
- let timestamp_options =
- HashMap::from([(SCAN_TIMESTAMP_MILLIS_OPTION.to_string(),
"1234".to_string())]);
- let timestamp_core = CoreOptions::new(×tamp_options);
+ let version_num_options =
+ HashMap::from([(SCAN_VERSION_OPTION.to_string(),
"3".to_string())]);
+ let version_num_core = CoreOptions::new(&version_num_options);
assert_eq!(
- timestamp_core
+ version_num_core
.try_time_travel_selector()
- .expect("timestamp selector"),
- Some(TimeTravelSelector::TimestampMillis(1234))
+ .expect("version numeric selector"),
+ Some(TimeTravelSelector::Version("3"))
);
}
diff --git a/crates/paimon/src/table/table_scan.rs
b/crates/paimon/src/table/table_scan.rs
index 1d2f621..a21b214 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -345,9 +345,8 @@ impl<'a> TableScan<'a> {
/// Plan the full scan: resolve snapshot (via options or latest), then
read manifests and build DataSplits.
///
/// Time travel is resolved from table options:
- /// - only one of `scan.tag-name`, `scan.snapshot-id`,
`scan.timestamp-millis` may be set
- /// - `scan.tag-name` → read the snapshot pinned by that tag
- /// - `scan.snapshot-id` → read that specific snapshot
+ /// - only one of `scan.version`, `scan.timestamp-millis` may be set
+ /// - `scan.version` → tag name (if exists) → snapshot id (if parseable) →
error
/// - `scan.timestamp-millis` → find the latest snapshot <= that timestamp
/// - otherwise → read the latest snapshot
///
@@ -367,19 +366,6 @@ impl<'a> TableScan<'a> {
let core_options = CoreOptions::new(self.table.schema().options());
match core_options.try_time_travel_selector()? {
- Some(TimeTravelSelector::TagName(tag_name)) => {
- let tag_manager = TagManager::new(file_io.clone(),
table_path.to_string());
- match tag_manager.get(tag_name).await? {
- Some(s) => Ok(Some(s)),
- None => Err(Error::DataInvalid {
- message: format!("Tag '{tag_name}' doesn't exist."),
- source: None,
- }),
- }
- }
- Some(TimeTravelSelector::SnapshotId(id)) => {
- snapshot_manager.get_snapshot(id).await.map(Some)
- }
Some(TimeTravelSelector::TimestampMillis(ts)) => {
match snapshot_manager.earlier_or_equal_time_mills(ts).await? {
Some(s) => Ok(Some(s)),
@@ -389,6 +375,26 @@ impl<'a> TableScan<'a> {
}),
}
}
+ Some(TimeTravelSelector::Version(v)) => {
+ // Tag first, then snapshot id, else error.
+ let tag_manager = TagManager::new(file_io.clone(),
table_path.to_string());
+ if tag_manager.tag_exists(v).await? {
+ match tag_manager.get(v).await? {
+ Some(s) => Ok(Some(s)),
+ None => Err(Error::DataInvalid {
+ message: format!("Tag '{v}' doesn't exist."),
+ source: None,
+ }),
+ }
+ } else if let Ok(id) = v.parse::<i64>() {
+ snapshot_manager.get_snapshot(id).await.map(Some)
+ } else {
+ Err(Error::DataInvalid {
+ message: format!("Version '{v}' is not a valid tag
name or snapshot id."),
+ source: None,
+ })
+ }
+ }
None => snapshot_manager.get_latest_snapshot().await,
}
}
diff --git a/crates/test_utils.rs b/crates/test_utils.rs
index cc88192..4813cc5 100644
--- a/crates/test_utils.rs
+++ b/crates/test_utils.rs
@@ -45,7 +45,7 @@ pub(crate) fn write_int_parquet_file(
let props = max_row_group_size.map(|size| {
WriterProperties::builder()
- .set_max_row_group_size(size)
+ .set_max_row_group_row_count(Some(size))
.build()
});
let file = File::create(path).unwrap();
diff --git a/docs/src/datafusion.md b/docs/src/datafusion.md
index 18930b6..df8c4b8 100644
--- a/docs/src/datafusion.md
+++ b/docs/src/datafusion.md
@@ -27,7 +27,7 @@ under the License.
[dependencies]
paimon = "0.1.0"
paimon-datafusion = "0.1.0"
-datafusion = "52"
+datafusion = "53"
tokio = { version = "1", features = ["full"] }
```
@@ -49,41 +49,43 @@ df.show().await?;
## Time Travel
-Paimon supports time travel queries to read historical data. In DataFusion,
this is done via the `FOR SYSTEM_TIME AS OF` clause.
+Paimon supports time travel queries to read historical data. In DataFusion,
this is done via the `VERSION AS OF` and `TIMESTAMP AS OF` clauses.
### By Snapshot ID
-Read data from a specific snapshot by passing an integer literal:
+Read data from a specific snapshot by passing an integer:
```sql
-SELECT * FROM paimon.default.my_table FOR SYSTEM_TIME AS OF 1
+SELECT * FROM paimon.default.my_table VERSION AS OF 1
```
-This sets the `scan.snapshot-id` option and reads exactly that snapshot.
+This sets the `scan.version` option. At scan time, the value is resolved as a
snapshot ID.
-### By Timestamp
+### By Tag Name
-Read data as of a specific point in time by passing a timestamp string in
`YYYY-MM-DD HH:MM:SS` format:
+Read data from a named tag. Since `VERSION AS OF` in SQL only accepts numeric
values, tag-based time travel is done via the `scan.version` table option:
-```sql
-SELECT * FROM paimon.default.my_table FOR SYSTEM_TIME AS OF '2024-01-01
00:00:00'
+```rust
+let table = table.copy_with_options(HashMap::from([
+ ("scan.version".to_string(), "my_tag".to_string()),
+]));
```
-This finds the latest snapshot whose commit time is less than or equal to the
given timestamp. The timestamp is interpreted in the local timezone.
+At scan time, the version value is resolved by first checking if a tag with
that name exists, then trying to parse it as a snapshot ID. If neither matches,
an error is returned.
-### By Tag Name
+### By Timestamp
-Read data from a named tag by passing a string that is not a timestamp:
+Read data as of a specific point in time by passing a timestamp string in
`YYYY-MM-DD HH:MM:SS` format:
```sql
-SELECT * FROM paimon.default.my_table FOR SYSTEM_TIME AS OF 'my_tag'
+SELECT * FROM paimon.default.my_table TIMESTAMP AS OF '2024-01-01 00:00:00'
```
-Tags are named snapshots created via Paimon's tag management (e.g., `CALL
sys.create_tag(...)` in Spark). This is useful for pinning a stable version of
the data for reproducible queries.
+This finds the latest snapshot whose commit time is less than or equal to the
given timestamp. The timestamp is interpreted in the local timezone.
### Enabling Time Travel Syntax
-DataFusion requires the BigQuery SQL dialect to parse `FOR SYSTEM_TIME AS OF`.
You also need to register the `PaimonRelationPlanner`:
+DataFusion requires the Databricks SQL dialect to parse `VERSION AS OF` and
`TIMESTAMP AS OF`. You also need to register the `PaimonRelationPlanner`:
```rust
use std::sync::Arc;
@@ -91,12 +93,12 @@ use datafusion::prelude::{SessionConfig, SessionContext};
use paimon_datafusion::{PaimonCatalogProvider, PaimonRelationPlanner};
let config = SessionConfig::new()
- .set_str("datafusion.sql_parser.dialect", "BigQuery");
+ .set_str("datafusion.sql_parser.dialect", "Databricks");
let ctx = SessionContext::new_with_config(config);
ctx.register_catalog("paimon",
Arc::new(PaimonCatalogProvider::new(Arc::new(catalog))));
ctx.register_relation_planner(Arc::new(PaimonRelationPlanner::new()))?;
// Now time travel queries work
-let df = ctx.sql("SELECT * FROM paimon.default.my_table FOR SYSTEM_TIME AS OF
1").await?;
+let df = ctx.sql("SELECT * FROM paimon.default.my_table VERSION AS OF
1").await?;
```