This is an automated email from the ASF dual-hosted git repository.
hope pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 9478259 feat: introduce time travel for data fusion (#195)
9478259 is described below
commit 9478259467002b9976f982ff2ad53aa7fec5aac9
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Apr 3 15:21:29 2026 +0800
feat: introduce time travel for data fusion (#195)
* feat: introduce time travel for data fusion
---
crates/integrations/datafusion/Cargo.toml | 2 +
crates/integrations/datafusion/src/lib.rs | 2 +
.../datafusion/src/relation_planner.rs | 189 +++++++++++++++++++++
.../integrations/datafusion/tests/read_tables.rs | 63 ++++++-
crates/paimon/src/spec/core_options.rs | 16 ++
crates/paimon/src/spec/schema.rs | 7 +
crates/paimon/src/table/mod.rs | 11 ++
crates/paimon/src/table/snapshot_manager.rs | 175 +++++++++++++++----
crates/paimon/src/table/table_scan.rs | 30 +++-
dev/spark/provision.py | 26 +++
10 files changed, 483 insertions(+), 38 deletions(-)
diff --git a/crates/integrations/datafusion/Cargo.toml
b/crates/integrations/datafusion/Cargo.toml
index c9673ec..e6ac2d3 100644
--- a/crates/integrations/datafusion/Cargo.toml
+++ b/crates/integrations/datafusion/Cargo.toml
@@ -26,9 +26,11 @@ keywords = ["paimon", "datafusion", "integrations"]
[dependencies]
async-trait = "0.1"
+chrono = "0.4"
datafusion = { version = "52.3.0"}
paimon = { path = "../../paimon" }
futures = "0.3"
[dev-dependencies]
+serde_json = "1"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
diff --git a/crates/integrations/datafusion/src/lib.rs
b/crates/integrations/datafusion/src/lib.rs
index 40639e2..cd73831 100644
--- a/crates/integrations/datafusion/src/lib.rs
+++ b/crates/integrations/datafusion/src/lib.rs
@@ -40,11 +40,13 @@ mod catalog;
mod error;
mod filter_pushdown;
mod physical_plan;
+mod relation_planner;
mod schema;
mod table;
pub use catalog::{PaimonCatalogProvider, PaimonSchemaProvider};
pub use error::to_datafusion_error;
pub use physical_plan::PaimonTableScan;
+pub use relation_planner::PaimonRelationPlanner;
pub use schema::paimon_schema_to_arrow;
pub use table::PaimonTableProvider;
diff --git a/crates/integrations/datafusion/src/relation_planner.rs
b/crates/integrations/datafusion/src/relation_planner.rs
new file mode 100644
index 0000000..1425a5b
--- /dev/null
+++ b/crates/integrations/datafusion/src/relation_planner.rs
@@ -0,0 +1,189 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Custom [`RelationPlanner`] for Paimon time travel via `FOR SYSTEM_TIME AS
OF`.
+
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+use datafusion::catalog::default_table_source::{provider_as_source,
source_as_provider};
+use datafusion::common::TableReference;
+use datafusion::error::Result as DFResult;
+use datafusion::logical_expr::builder::LogicalPlanBuilder;
+use datafusion::logical_expr::planner::{
+ PlannedRelation, RelationPlanner, RelationPlannerContext, RelationPlanning,
+};
+use datafusion::sql::sqlparser::ast::{self, TableFactor, TableVersion};
+use paimon::spec::{SCAN_SNAPSHOT_ID_OPTION, SCAN_TIMESTAMP_MILLIS_OPTION};
+
+use crate::table::PaimonTableProvider;
+
+/// A [`RelationPlanner`] that intercepts `FOR SYSTEM_TIME AS OF` clauses
+/// on Paimon tables and resolves them to time travel options.
+///
+/// - Integer literal → sets `scan.snapshot-id` option on the table.
+/// - String literal → parsed as a timestamp, sets `scan.timestamp-millis`
option.
+#[derive(Debug)]
+pub struct PaimonRelationPlanner;
+
+impl PaimonRelationPlanner {
+ pub fn new() -> Self {
+ Self
+ }
+}
+
+impl Default for PaimonRelationPlanner {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl RelationPlanner for PaimonRelationPlanner {
+ fn plan_relation(
+ &self,
+ relation: TableFactor,
+ context: &mut dyn RelationPlannerContext,
+ ) -> DFResult<RelationPlanning> {
+ // Only handle Table factors with a version clause.
+ let TableFactor::Table {
+ ref name,
+ ref version,
+ ..
+ } = relation
+ else {
+ return Ok(RelationPlanning::Original(relation));
+ };
+
+ let version_expr = match version {
+ Some(TableVersion::ForSystemTimeAsOf(expr)) => expr.clone(),
+ _ => return Ok(RelationPlanning::Original(relation)),
+ };
+
+ // Resolve the table reference.
+ let table_ref = object_name_to_table_reference(name, context)?;
+ let source = context
+ .context_provider()
+ .get_table_source(table_ref.clone())?;
+ let provider = source_as_provider(&source)?;
+
+ // Check if this is a Paimon table.
+ let Some(paimon_provider) =
provider.as_any().downcast_ref::<PaimonTableProvider>() else {
+ return Ok(RelationPlanning::Original(relation));
+ };
+
+ let extra_options = resolve_time_travel_options(&version_expr)?;
+ let new_table =
paimon_provider.table().copy_with_options(extra_options);
+ let new_provider = PaimonTableProvider::try_new(new_table)?;
+ let new_source = provider_as_source(Arc::new(new_provider));
+
+ // Destructure to get alias.
+ let TableFactor::Table { alias, .. } = relation else {
+ unreachable!()
+ };
+
+ let plan = LogicalPlanBuilder::scan(table_ref, new_source,
None)?.build()?;
+ Ok(RelationPlanning::Planned(PlannedRelation::new(plan, alias)))
+ }
+}
+
+/// Convert a sqlparser `ObjectName` to a DataFusion `TableReference`.
+fn object_name_to_table_reference(
+ name: &ast::ObjectName,
+ context: &mut dyn RelationPlannerContext,
+) -> DFResult<TableReference> {
+ let idents: Vec<String> = name
+ .0
+ .iter()
+ .map(|part| {
+ let ident = part.as_ident().ok_or_else(|| {
+ datafusion::error::DataFusionError::Plan(format!(
+ "Expected simple identifier in table reference, got:
{part}"
+ ))
+ })?;
+ Ok(context.normalize_ident(ident.clone()))
+ })
+ .collect::<DFResult<_>>()?;
+ match idents.len() {
+ 1 => Ok(TableReference::bare(idents[0].clone())),
+ 2 => Ok(TableReference::partial(
+ idents[0].clone(),
+ idents[1].clone(),
+ )),
+ 3 => Ok(TableReference::full(
+ idents[0].clone(),
+ idents[1].clone(),
+ idents[2].clone(),
+ )),
+ _ => Err(datafusion::error::DataFusionError::Plan(format!(
+ "Unsupported table reference: {name}"
+ ))),
+ }
+}
+
+/// Resolve `FOR SYSTEM_TIME AS OF <expr>` into table options.
+///
+/// - Integer literal → `{"scan.snapshot-id": "N"}`
+/// - String literal (timestamp) → parse to millis →
`{"scan.timestamp-millis": "M"}`
+fn resolve_time_travel_options(expr: &ast::Expr) -> DFResult<HashMap<String,
String>> {
+ match expr {
+ ast::Expr::Value(v) => match &v.value {
+ ast::Value::Number(n, _) => {
+ // Validate it's a valid integer
+ n.parse::<i64>().map_err(|e| {
+ datafusion::error::DataFusionError::Plan(format!(
+ "Invalid snapshot id '{n}': {e}"
+ ))
+ })?;
+ Ok(HashMap::from([(
+ SCAN_SNAPSHOT_ID_OPTION.to_string(),
+ n.clone(),
+ )]))
+ }
+ ast::Value::SingleQuotedString(s) |
ast::Value::DoubleQuotedString(s) => {
+ let timestamp_millis = parse_timestamp_to_millis(s)?;
+ Ok(HashMap::from([(
+ SCAN_TIMESTAMP_MILLIS_OPTION.to_string(),
+ timestamp_millis.to_string(),
+ )]))
+ }
+ _ => Err(datafusion::error::DataFusionError::Plan(format!(
+ "Unsupported time travel expression: {expr}"
+ ))),
+ },
+ _ => Err(datafusion::error::DataFusionError::Plan(format!(
+ "Unsupported time travel expression: {expr}. Expected an integer
snapshot id or a timestamp string."
+ ))),
+ }
+}
+
+/// Parse a timestamp string to milliseconds since epoch (using local
timezone).
+///
+/// Matches Java Paimon's behavior which uses `TimeZone.getDefault()`.
+fn parse_timestamp_to_millis(ts: &str) -> DFResult<i64> {
+ use chrono::{Local, NaiveDateTime, TimeZone};
+
+ let naive = NaiveDateTime::parse_from_str(ts, "%Y-%m-%d
%H:%M:%S").map_err(|e| {
+ datafusion::error::DataFusionError::Plan(format!(
+ "Cannot parse time travel timestamp '{ts}': {e}. Expected format:
YYYY-MM-DD HH:MM:SS"
+ ))
+ })?;
+ let local = Local.from_local_datetime(&naive).single().ok_or_else(|| {
+ datafusion::error::DataFusionError::Plan(format!("Ambiguous or invalid
local time: '{ts}'"))
+ })?;
+ Ok(local.timestamp_millis())
+}
diff --git a/crates/integrations/datafusion/tests/read_tables.rs
b/crates/integrations/datafusion/tests/read_tables.rs
index 89f0dd0..8f831d7 100644
--- a/crates/integrations/datafusion/tests/read_tables.rs
+++ b/crates/integrations/datafusion/tests/read_tables.rs
@@ -24,7 +24,7 @@ use datafusion::logical_expr::{col, lit,
TableProviderFilterPushDown};
use datafusion::prelude::{SessionConfig, SessionContext};
use paimon::catalog::Identifier;
use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options};
-use paimon_datafusion::{PaimonCatalogProvider, PaimonTableProvider};
+use paimon_datafusion::{PaimonCatalogProvider, PaimonRelationPlanner,
PaimonTableProvider};
fn get_test_warehouse() -> String {
std::env::var("PAIMON_TEST_WAREHOUSE").unwrap_or_else(|_|
"/tmp/paimon-warehouse".to_string())
@@ -363,3 +363,64 @@ async fn test_missing_database_returns_no_schema() {
"missing databases should not resolve to a schema provider"
);
}
+
+// ======================= Time Travel Tests =======================
+
+/// Helper: create a SessionContext with catalog + relation planner for time
travel.
+/// Uses BigQuery dialect to enable `FOR SYSTEM_TIME AS OF` syntax.
+async fn create_time_travel_context() -> SessionContext {
+ let catalog = create_catalog();
+ let config = SessionConfig::new().set_str("datafusion.sql_parser.dialect",
"BigQuery");
+ let ctx = SessionContext::new_with_config(config);
+ ctx.register_catalog(
+ "paimon",
+ Arc::new(PaimonCatalogProvider::new(Arc::new(catalog))),
+ );
+ ctx.register_relation_planner(Arc::new(PaimonRelationPlanner::new()))
+ .expect("Failed to register relation planner");
+ ctx
+}
+
+#[tokio::test]
+async fn test_time_travel_by_snapshot_id() {
+ let ctx = create_time_travel_context().await;
+
+ // Snapshot 1: should contain only the first insert (alice, bob)
+ let batches = ctx
+ .sql("SELECT id, name FROM paimon.default.time_travel_table FOR
SYSTEM_TIME AS OF 1")
+ .await
+ .expect("time travel query should parse")
+ .collect()
+ .await
+ .expect("time travel query should execute");
+
+ let mut rows = extract_id_name_rows(&batches);
+ rows.sort_by_key(|(id, _)| *id);
+ assert_eq!(
+ rows,
+ vec![(1, "alice".to_string()), (2, "bob".to_string())],
+ "Snapshot 1 should contain only the first batch of rows"
+ );
+
+ // Snapshot 2 (latest): should contain all rows
+ let batches = ctx
+ .sql("SELECT id, name FROM paimon.default.time_travel_table FOR
SYSTEM_TIME AS OF 2")
+ .await
+ .expect("time travel query should parse")
+ .collect()
+ .await
+ .expect("time travel query should execute");
+
+ let mut rows = extract_id_name_rows(&batches);
+ rows.sort_by_key(|(id, _)| *id);
+ assert_eq!(
+ rows,
+ vec![
+ (1, "alice".to_string()),
+ (2, "bob".to_string()),
+ (3, "carol".to_string()),
+ (4, "dave".to_string()),
+ ],
+ "Snapshot 2 should contain all rows"
+ );
+}
diff --git a/crates/paimon/src/spec/core_options.rs
b/crates/paimon/src/spec/core_options.rs
index 0f15922..aeae23c 100644
--- a/crates/paimon/src/spec/core_options.rs
+++ b/crates/paimon/src/spec/core_options.rs
@@ -23,6 +23,8 @@ const SOURCE_SPLIT_TARGET_SIZE_OPTION: &str =
"source.split.target-size";
const SOURCE_SPLIT_OPEN_FILE_COST_OPTION: &str = "source.split.open-file-cost";
const PARTITION_DEFAULT_NAME_OPTION: &str = "partition.default-name";
const PARTITION_LEGACY_NAME_OPTION: &str = "partition.legacy-name";
+pub const SCAN_SNAPSHOT_ID_OPTION: &str = "scan.snapshot-id";
+pub const SCAN_TIMESTAMP_MILLIS_OPTION: &str = "scan.timestamp-millis";
const DEFAULT_SOURCE_SPLIT_TARGET_SIZE: i64 = 128 * 1024 * 1024;
const DEFAULT_SOURCE_SPLIT_OPEN_FILE_COST: i64 = 4 * 1024 * 1024;
const DEFAULT_PARTITION_DEFAULT_NAME: &str = "__DEFAULT_PARTITION__";
@@ -88,6 +90,20 @@ impl<'a> CoreOptions<'a> {
.map(|v| v.eq_ignore_ascii_case("true"))
.unwrap_or(true)
}
+
+ /// Snapshot id for time travel via `scan.snapshot-id`.
+ pub fn scan_snapshot_id(&self) -> Option<i64> {
+ self.options
+ .get(SCAN_SNAPSHOT_ID_OPTION)
+ .and_then(|v| v.parse().ok())
+ }
+
+ /// Timestamp in millis for time travel via `scan.timestamp-millis`.
+ pub fn scan_timestamp_millis(&self) -> Option<i64> {
+ self.options
+ .get(SCAN_TIMESTAMP_MILLIS_OPTION)
+ .and_then(|v| v.parse().ok())
+ }
}
/// Parse a memory size string to bytes using binary (1024-based) semantics.
diff --git a/crates/paimon/src/spec/schema.rs b/crates/paimon/src/spec/schema.rs
index 06cdd11..c1047ad 100644
--- a/crates/paimon/src/spec/schema.rs
+++ b/crates/paimon/src/spec/schema.rs
@@ -94,6 +94,13 @@ impl TableSchema {
&self.options
}
+ /// Create a copy of this schema with extra options merged in.
+ pub fn copy_with_options(&self, extra: HashMap<String, String>) -> Self {
+ let mut new_schema = self.clone();
+ new_schema.options.extend(extra);
+ new_schema
+ }
+
pub fn comment(&self) -> Option<&str> {
self.comment.as_deref()
}
diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs
index 7c3f9a5..d936d7a 100644
--- a/crates/paimon/src/table/mod.rs
+++ b/crates/paimon/src/table/mod.rs
@@ -34,6 +34,7 @@ pub use table_scan::TableScan;
use crate::catalog::Identifier;
use crate::io::FileIO;
use crate::spec::TableSchema;
+use std::collections::HashMap;
/// Table represents a table in the catalog.
#[derive(Debug, Clone)]
@@ -87,6 +88,16 @@ impl Table {
pub fn new_read_builder(&self) -> ReadBuilder<'_> {
ReadBuilder::new(self)
}
+
+ /// Create a copy of this table with extra options merged into the schema.
+ pub fn copy_with_options(&self, extra: HashMap<String, String>) -> Self {
+ Self {
+ file_io: self.file_io.clone(),
+ identifier: self.identifier.clone(),
+ location: self.location.clone(),
+ schema: self.schema.copy_with_options(extra),
+ }
+ }
}
/// A stream of arrow [`RecordBatch`]es.
diff --git a/crates/paimon/src/table/snapshot_manager.rs
b/crates/paimon/src/table/snapshot_manager.rs
index 4f979a5..6d8eab2 100644
--- a/crates/paimon/src/table/snapshot_manager.rs
+++ b/crates/paimon/src/table/snapshot_manager.rs
@@ -18,15 +18,14 @@
//! Snapshot manager for reading snapshot metadata using FileIO.
//!
//!
Reference:[org.apache.paimon.utils.SnapshotManager](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java).
-// TODO: remove when SnapshotManager is used (e.g. from Table or source
planning).
-#![allow(dead_code)]
-
use crate::io::FileIO;
use crate::spec::Snapshot;
use std::str;
const SNAPSHOT_DIR: &str = "snapshot";
-const LATEST_SNAPSHOT_FILE: &str = "LATEST";
+const SNAPSHOT_PREFIX: &str = "snapshot-";
+const LATEST_HINT: &str = "LATEST";
+const EARLIEST_HINT: &str = "EARLIEST";
/// Manager for snapshot files using unified FileIO.
///
@@ -51,9 +50,14 @@ impl SnapshotManager {
format!("{}/{}", self.table_path, SNAPSHOT_DIR)
}
- /// Path to the LATEST file that stores the latest snapshot id.
- pub fn latest_file_path(&self) -> String {
- format!("{}/{}", self.snapshot_dir(), LATEST_SNAPSHOT_FILE)
+ /// Path to the LATEST hint file.
+ fn latest_hint_path(&self) -> String {
+ format!("{}/{}", self.snapshot_dir(), LATEST_HINT)
+ }
+
+ /// Path to the EARLIEST hint file.
+ fn earliest_hint_path(&self) -> String {
+ format!("{}/{}", self.snapshot_dir(), EARLIEST_HINT)
}
/// Path to the snapshot file for the given id (e.g.
`snapshot/snapshot-1`).
@@ -61,36 +65,87 @@ impl SnapshotManager {
format!("{}/snapshot-{}", self.snapshot_dir(), snapshot_id)
}
- /// Get the latest snapshot, or None if LATEST does not exist.
- /// Returns an error if LATEST exists but the snapshot file
(snapshot-{id}) does not exist.
- pub async fn get_latest_snapshot(&self) -> crate::Result<Option<Snapshot>>
{
- // todo: consider snapshot loader to load snapshot from catalog
- let latest_path = self.latest_file_path();
- let input = self.file_io.new_input(&latest_path)?;
- if !input.exists().await? {
- // todo: may need to list directory and find the latest snapshot
- return Ok(None);
+ /// Read a hint file and return the id, or None if the file does not exist,
+ /// is being deleted, or contains invalid content.
+ ///
+ /// Reference:
[HintFileUtils.readHint](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/utils/HintFileUtils.java)
+ async fn read_hint(&self, path: &str) -> Option<i64> {
+ let input = self.file_io.new_input(path).ok()?;
+ // Try to read directly without exists() check to avoid TOCTOU race.
+ // The file may be deleted or overwritten concurrently.
+ let content = input.read().await.ok()?;
+ let id_str = str::from_utf8(&content).ok()?;
+ id_str.trim().parse().ok()
+ }
+
+ /// List snapshot files and find the id using the given reducer (min or
max).
+ async fn find_by_list_files(&self, reducer: fn(i64, i64) -> i64) ->
crate::Result<Option<i64>> {
+ let snapshot_dir = self.snapshot_dir();
+ let statuses = self.file_io.list_status(&snapshot_dir).await?;
+ let mut result: Option<i64> = None;
+ for status in statuses {
+ if status.is_dir {
+ continue;
+ }
+ let name = status.path.rsplit('/').next().unwrap_or(&status.path);
+ if let Some(id_str) = name.strip_prefix(SNAPSHOT_PREFIX) {
+ if let Ok(id) = id_str.parse::<i64>() {
+ result = Some(match result {
+ Some(r) => reducer(r, id),
+ None => id,
+ });
+ }
+ }
}
- let content = input.read().await?;
- let id_str = str::from_utf8(&content).map_err(|e|
crate::Error::DataInvalid {
- message: "LATEST snapshot file invalid utf8".to_string(),
- source: Some(Box::new(e)),
- })?;
- let snapshot_id: i64 = id_str
- .trim()
- .parse()
- .map_err(|e| crate::Error::DataInvalid {
- message: format!("LATEST snapshot id not a number:
{id_str:?}"),
- source: Some(Box::new(e)),
- })?;
+ Ok(result)
+ }
+
+ /// Get the latest snapshot id.
+ ///
+ /// First tries the LATEST hint file. If the hint is valid and no next
snapshot
+ /// exists, returns it. Otherwise falls back to listing snapshot files.
+ ///
+ /// Reference:
[HintFileUtils.findLatest](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/utils/HintFileUtils.java)
+ pub async fn get_latest_snapshot_id(&self) -> crate::Result<Option<i64>> {
+ let hint_path = self.latest_hint_path();
+ if let Some(hint_id) = self.read_hint(&hint_path).await {
+ if hint_id > 0 {
+ let next_path = self.snapshot_path(hint_id + 1);
+ let next_input = self.file_io.new_input(&next_path)?;
+ if !next_input.exists().await? {
+ return Ok(Some(hint_id));
+ }
+ }
+ }
+ self.find_by_list_files(i64::max).await
+ }
+
+ /// Get the earliest snapshot id.
+ ///
+ /// First tries the EARLIEST hint file. If the hint is valid and the
snapshot
+ /// file exists, returns it. Otherwise falls back to listing snapshot
files.
+ ///
+ /// Reference:
[HintFileUtils.findEarliest](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/utils/HintFileUtils.java)
+ pub async fn earliest_snapshot_id(&self) -> crate::Result<Option<i64>> {
+ let hint_path = self.earliest_hint_path();
+ if let Some(hint_id) = self.read_hint(&hint_path).await {
+ let snap_path = self.snapshot_path(hint_id);
+ let snap_input = self.file_io.new_input(&snap_path)?;
+ if snap_input.exists().await? {
+ return Ok(Some(hint_id));
+ }
+ }
+ self.find_by_list_files(i64::min).await
+ }
+
+ /// Get a snapshot by id. Returns an error if the snapshot file does not
exist.
+ pub async fn get_snapshot(&self, snapshot_id: i64) ->
crate::Result<Snapshot> {
let snapshot_path = self.snapshot_path(snapshot_id);
let snap_input = self.file_io.new_input(&snapshot_path)?;
if !snap_input.exists().await? {
return Err(crate::Error::DataInvalid {
- message: format!(
- "snapshot file does not exist: {snapshot_path} (LATEST
points to snapshot id {snapshot_id})"
- ),
- source: None
+ message: format!("snapshot file does not exist:
{snapshot_path}"),
+ source: None,
});
}
let snap_bytes = snap_input.read().await?;
@@ -102,12 +157,66 @@ impl SnapshotManager {
if snapshot.id() != snapshot_id {
return Err(crate::Error::DataInvalid {
message: format!(
- "snapshot file id mismatch: LATEST points to
{snapshot_id}, but file contains snapshot id {}",
+ "snapshot file id mismatch: in file name is {snapshot_id},
but file contains snapshot id {}",
snapshot.id()
),
source: None
});
}
+ Ok(snapshot)
+ }
+
+ /// Get the latest snapshot, or None if no snapshots exist.
+ pub async fn get_latest_snapshot(&self) -> crate::Result<Option<Snapshot>>
{
+ let snapshot_id = match self.get_latest_snapshot_id().await? {
+ Some(id) => id,
+ None => return Ok(None),
+ };
+ let snapshot = self.get_snapshot(snapshot_id).await?;
Ok(Some(snapshot))
}
+
+ /// Returns the snapshot whose commit time is earlier than or equal to the
given
+ /// `timestamp_millis`. If no such snapshot exists, returns None.
+ ///
+ /// Uses binary search over snapshot IDs (assumes monotonically increasing
commit times).
+ ///
+ /// Reference:
[SnapshotManager.earlierOrEqualTimeMills](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java)
+ pub async fn earlier_or_equal_time_mills(
+ &self,
+ timestamp_millis: i64,
+ ) -> crate::Result<Option<Snapshot>> {
+ let mut latest = match self.get_latest_snapshot_id().await? {
+ Some(id) => id,
+ None => return Ok(None),
+ };
+
+ let earliest_snapshot = match self.earliest_snapshot_id().await? {
+ Some(id) => self.get_snapshot(id).await?,
+ None => return Ok(None),
+ };
+
+ // If the earliest snapshot is already after the timestamp, no match.
+ if (earliest_snapshot.time_millis() as i64) > timestamp_millis {
+ return Ok(None);
+ }
+ let mut earliest = earliest_snapshot.id();
+
+ let mut result: Option<Snapshot> = None;
+ while earliest <= latest {
+ let mid = earliest + (latest - earliest) / 2;
+ let snapshot = self.get_snapshot(mid).await?;
+ let commit_time = snapshot.time_millis() as i64;
+ if commit_time > timestamp_millis {
+ latest = mid - 1;
+ } else if commit_time < timestamp_millis {
+ earliest = mid + 1;
+ result = Some(snapshot);
+ } else {
+ result = Some(snapshot);
+ break;
+ }
+ }
+ Ok(result)
+ }
}
diff --git a/crates/paimon/src/table/table_scan.rs
b/crates/paimon/src/table/table_scan.rs
index 3988a17..027dd75 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -240,15 +240,37 @@ impl<'a> TableScan<'a> {
}
}
- /// Plan the full scan: read latest snapshot, manifest list, manifest
entries, then build DataSplits using bin packing.
+ /// Plan the full scan: resolve snapshot (via options or latest), then
read manifests and build DataSplits.
+ ///
+ /// Time travel is resolved from table options:
+ /// - `scan.snapshot-id` → read that specific snapshot
+ /// - `scan.timestamp-millis` → find the latest snapshot <= that timestamp
+ /// - otherwise → read the latest snapshot
+ ///
+ /// Reference:
[TimeTravelUtil.tryTravelToSnapshot](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java)
pub async fn plan(&self) -> crate::Result<Plan> {
let file_io = self.table.file_io();
let table_path = self.table.location();
let snapshot_manager = SnapshotManager::new(file_io.clone(),
table_path.to_string());
+ let core_options = CoreOptions::new(self.table.schema().options());
- let snapshot = match snapshot_manager.get_latest_snapshot().await? {
- Some(s) => s,
- None => return Ok(Plan::new(Vec::new())),
+ let snapshot = if let Some(id) = core_options.scan_snapshot_id() {
+ snapshot_manager.get_snapshot(id).await?
+ } else if let Some(ts) = core_options.scan_timestamp_millis() {
+ match snapshot_manager.earlier_or_equal_time_mills(ts).await? {
+ Some(s) => s,
+ None => {
+ return Err(Error::DataInvalid {
+ message: format!("No snapshot found with timestamp <=
{ts}"),
+ source: None,
+ })
+ }
+ }
+ } else {
+ match snapshot_manager.get_latest_snapshot().await? {
+ Some(s) => s,
+ None => return Ok(Plan::new(Vec::new())),
+ }
};
self.plan_snapshot(snapshot).await
}
diff --git a/dev/spark/provision.py b/dev/spark/provision.py
index cdc538a..baa6a04 100644
--- a/dev/spark/provision.py
+++ b/dev/spark/provision.py
@@ -260,6 +260,32 @@ def main():
)
spark.sql("DROP TABLE data_evolution_updates")
+ # ===== Time travel table: multiple snapshots for time travel tests =====
+ # Snapshot 1: rows (1, 'alice'), (2, 'bob')
+ # Snapshot 2: rows (1, 'alice'), (2, 'bob'), (3, 'carol'), (4, 'dave')
+ spark.sql(
+ """
+ CREATE TABLE IF NOT EXISTS time_travel_table (
+ id INT,
+ name STRING
+ ) USING paimon
+ """
+ )
+ spark.sql(
+ """
+ INSERT INTO time_travel_table VALUES
+ (1, 'alice'),
+ (2, 'bob')
+ """
+ )
+ spark.sql(
+ """
+ INSERT INTO time_travel_table VALUES
+ (3, 'carol'),
+ (4, 'dave')
+ """
+ )
+
if __name__ == "__main__":
main()