This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 6635792  fix(scan): Harden time-travel selector validation (#219)
6635792 is described below

commit 663579236e4ba38aeb34fb252cd22214a34bf20b
Author: Zach <[email protected]>
AuthorDate: Mon Apr 6 21:23:55 2026 +0800

    fix(scan): Harden time-travel selector validation (#219)
---
 crates/integration_tests/tests/read_tables.rs      |  64 ++++++++
 .../integrations/datafusion/tests/read_tables.rs   |  81 +++++++++++
 crates/paimon/src/spec/core_options.rs             | 162 ++++++++++++++++++++-
 crates/paimon/src/spec/mod.rs                      |   1 +
 crates/paimon/src/table/table_scan.rs              |  51 ++++---
 5 files changed, 332 insertions(+), 27 deletions(-)

diff --git a/crates/integration_tests/tests/read_tables.rs 
b/crates/integration_tests/tests/read_tables.rs
index 008b081..cbe14ee 100644
--- a/crates/integration_tests/tests/read_tables.rs
+++ b/crates/integration_tests/tests/read_tables.rs
@@ -1799,6 +1799,70 @@ async fn test_time_travel_by_tag_name() {
     );
 }
 
+#[tokio::test]
+async fn test_time_travel_conflicting_selectors_fail() {
+    let catalog = create_file_system_catalog();
+    let table = get_table_from_catalog(&catalog, "time_travel_table").await;
+
+    let conflicted = table.copy_with_options(HashMap::from([
+        ("scan.tag-name".to_string(), "snapshot1".to_string()),
+        ("scan.snapshot-id".to_string(), "2".to_string()),
+    ]));
+
+    let plan_err = conflicted
+        .new_read_builder()
+        .new_scan()
+        .plan()
+        .await
+        .expect_err("conflicting time-travel selectors should fail");
+
+    match plan_err {
+        Error::DataInvalid { message, .. } => {
+            assert!(
+                message.contains("Only one time-travel selector may be set"),
+                "unexpected conflict error: {message}"
+            );
+            assert!(
+                message.contains("scan.snapshot-id"),
+                "conflict error should mention scan.snapshot-id: {message}"
+            );
+            assert!(
+                message.contains("scan.tag-name"),
+                "conflict error should mention scan.tag-name: {message}"
+            );
+        }
+        other => panic!("unexpected error: {other:?}"),
+    }
+}
+
+#[tokio::test]
+async fn test_time_travel_invalid_numeric_selector_fails() {
+    let catalog = create_file_system_catalog();
+    let table = get_table_from_catalog(&catalog, "time_travel_table").await;
+
+    let invalid = table.copy_with_options(HashMap::from([(
+        "scan.snapshot-id".to_string(),
+        "not-a-number".to_string(),
+    )]));
+
+    let plan_err = invalid
+        .new_read_builder()
+        .new_scan()
+        .plan()
+        .await
+        .expect_err("invalid numeric time-travel selector should fail");
+
+    match plan_err {
+        Error::DataInvalid { message, .. } => {
+            assert!(
+                message.contains("Invalid value for scan.snapshot-id"),
+                "unexpected invalid selector error: {message}"
+            );
+        }
+        other => panic!("unexpected error: {other:?}"),
+    }
+}
+
 // ---------------------------------------------------------------------------
 // Data evolution + drop column tests
 // ---------------------------------------------------------------------------
diff --git a/crates/integrations/datafusion/tests/read_tables.rs 
b/crates/integrations/datafusion/tests/read_tables.rs
index d438720..d3966ca 100644
--- a/crates/integrations/datafusion/tests/read_tables.rs
+++ b/crates/integrations/datafusion/tests/read_tables.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::collections::HashMap;
 use std::sync::Arc;
 
 use datafusion::arrow::array::{Array, Int32Array, StringArray};
@@ -57,6 +58,21 @@ async fn create_provider(table_name: &str) -> 
PaimonTableProvider {
     PaimonTableProvider::try_new(table).expect("Failed to create table 
provider")
 }
 
+async fn create_provider_with_options(
+    table_name: &str,
+    extra_options: HashMap<String, String>,
+) -> PaimonTableProvider {
+    let catalog = create_catalog();
+    let identifier = Identifier::new("default", table_name);
+    let table = catalog
+        .get_table(&identifier)
+        .await
+        .expect("Failed to get table")
+        .copy_with_options(extra_options);
+
+    PaimonTableProvider::try_new(table).expect("Failed to create table 
provider")
+}
+
 async fn read_rows(table_name: &str) -> Vec<(i32, String)> {
     let batches = collect_query(table_name, &format!("SELECT id, name FROM 
{table_name}"))
         .await
@@ -469,6 +485,71 @@ async fn test_time_travel_by_tag_name() {
     );
 }
 
+#[tokio::test]
+async fn test_time_travel_conflicting_selectors_fail() {
+    let provider = create_provider_with_options(
+        "time_travel_table",
+        HashMap::from([("scan.tag-name".to_string(), 
"snapshot1".to_string())]),
+    )
+    .await;
+
+    let config = SessionConfig::new().set_str("datafusion.sql_parser.dialect", 
"BigQuery");
+    let ctx = SessionContext::new_with_config(config);
+    ctx.register_table("time_travel_table", Arc::new(provider))
+        .expect("Failed to register table");
+    ctx.register_relation_planner(Arc::new(PaimonRelationPlanner::new()))
+        .expect("Failed to register relation planner");
+
+    let err = ctx
+        .sql("SELECT id, name FROM time_travel_table FOR SYSTEM_TIME AS OF 2")
+        .await
+        .expect("time travel query should parse")
+        .collect()
+        .await
+        .expect_err("conflicting time-travel selectors should fail");
+
+    let message = err.to_string();
+    assert!(
+        message.contains("Only one time-travel selector may be set"),
+        "unexpected conflict error: {message}"
+    );
+    assert!(
+        message.contains("scan.snapshot-id"),
+        "conflict error should mention scan.snapshot-id: {message}"
+    );
+    assert!(
+        message.contains("scan.tag-name"),
+        "conflict error should mention scan.tag-name: {message}"
+    );
+}
+
+#[tokio::test]
+async fn test_time_travel_invalid_numeric_selector_fails() {
+    let provider = create_provider_with_options(
+        "time_travel_table",
+        HashMap::from([("scan.snapshot-id".to_string(), 
"not-a-number".to_string())]),
+    )
+    .await;
+
+    let ctx = SessionContext::new();
+    ctx.register_table("time_travel_table", Arc::new(provider))
+        .expect("Failed to register table");
+
+    let err = ctx
+        .sql("SELECT id, name FROM time_travel_table")
+        .await
+        .expect("query should parse")
+        .collect()
+        .await
+        .expect_err("invalid numeric time-travel selector should fail");
+
+    let message = err.to_string();
+    assert!(
+        message.contains("Invalid value for scan.snapshot-id"),
+        "unexpected invalid selector error: {message}"
+    );
+}
+
 /// Verifies that data evolution merge correctly NULL-fills columns that no 
file in a
 /// merge group provides (e.g. a newly added column after MERGE INTO on old 
rows).
 /// Without the fix, `active_file_indices` would be empty and rows would be 
silently lost.
diff --git a/crates/paimon/src/spec/core_options.rs 
b/crates/paimon/src/spec/core_options.rs
index 94f35d6..1d2d6a4 100644
--- a/crates/paimon/src/spec/core_options.rs
+++ b/crates/paimon/src/spec/core_options.rs
@@ -40,6 +40,13 @@ pub struct CoreOptions<'a> {
     options: &'a HashMap<String, String>,
 }
 
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub(crate) enum TimeTravelSelector<'a> {
+    TagName(&'a str),
+    SnapshotId(i64),
+    TimestampMillis(i64),
+}
+
 impl<'a> CoreOptions<'a> {
     pub fn new(options: &'a HashMap<String, String>) -> Self {
         Self { options }
@@ -94,25 +101,90 @@ impl<'a> CoreOptions<'a> {
             .unwrap_or(true)
     }
 
-    /// Snapshot id for time travel via `scan.snapshot-id`.
+    fn parse_i64_option(&self, option_name: &'static str) -> 
crate::Result<Option<i64>> {
+        match self.options.get(option_name) {
+            Some(value) => value
+                .parse::<i64>()
+                .map(Some)
+                .map_err(|e| crate::Error::DataInvalid {
+                    message: format!("Invalid value for {option_name}: 
'{value}'"),
+                    source: Some(Box::new(e)),
+                }),
+            None => Ok(None),
+        }
+    }
+
+    /// Raw snapshot id accessor for `scan.snapshot-id`.
+    ///
+    /// This compatibility accessor is lossy: it returns `None` for absent or
+    /// invalid values and does not validate selector conflicts. Internal
+    /// time-travel planning should use `try_time_travel_selector`.
     pub fn scan_snapshot_id(&self) -> Option<i64> {
         self.options
             .get(SCAN_SNAPSHOT_ID_OPTION)
             .and_then(|v| v.parse().ok())
     }
 
-    /// Timestamp in millis for time travel via `scan.timestamp-millis`.
+    /// Raw timestamp accessor for `scan.timestamp-millis`.
+    ///
+    /// This compatibility accessor is lossy: it returns `None` for absent or
+    /// invalid values and does not validate selector conflicts. Internal
+    /// time-travel planning should use `try_time_travel_selector`.
     pub fn scan_timestamp_millis(&self) -> Option<i64> {
         self.options
             .get(SCAN_TIMESTAMP_MILLIS_OPTION)
             .and_then(|v| v.parse().ok())
     }
 
-    /// Tag name for time travel via `scan.tag-name`.
-    pub fn scan_tag_name(&self) -> Option<&str> {
+    /// Raw tag name accessor for `scan.tag-name`.
+    ///
+    /// This compatibility accessor does not validate selector conflicts.
+    /// Internal time-travel planning should use `try_time_travel_selector`.
+    pub fn scan_tag_name(&self) -> Option<&'a str> {
         self.options.get(SCAN_TAG_NAME_OPTION).map(String::as_str)
     }
 
+    fn configured_time_travel_selectors(&self) -> Vec<&'static str> {
+        let mut selectors = Vec::with_capacity(3);
+        if self.options.contains_key(SCAN_TAG_NAME_OPTION) {
+            selectors.push(SCAN_TAG_NAME_OPTION);
+        }
+        if self.options.contains_key(SCAN_SNAPSHOT_ID_OPTION) {
+            selectors.push(SCAN_SNAPSHOT_ID_OPTION);
+        }
+        if self.options.contains_key(SCAN_TIMESTAMP_MILLIS_OPTION) {
+            selectors.push(SCAN_TIMESTAMP_MILLIS_OPTION);
+        }
+        selectors
+    }
+
+    /// Validates and normalizes the internal time-travel selector.
+    ///
+    /// This is the semantic owner for selector mutual exclusion and strict
+    /// numeric parsing.
+    pub(crate) fn try_time_travel_selector(&self) -> 
crate::Result<Option<TimeTravelSelector<'a>>> {
+        let selectors = self.configured_time_travel_selectors();
+        if selectors.len() > 1 {
+            return Err(crate::Error::DataInvalid {
+                message: format!(
+                    "Only one time-travel selector may be set, found: {}",
+                    selectors.join(", ")
+                ),
+                source: None,
+            });
+        }
+
+        if let Some(tag_name) = self.scan_tag_name() {
+            Ok(Some(TimeTravelSelector::TagName(tag_name)))
+        } else if let Some(id) = 
self.parse_i64_option(SCAN_SNAPSHOT_ID_OPTION)? {
+            Ok(Some(TimeTravelSelector::SnapshotId(id)))
+        } else if let Some(ts) = 
self.parse_i64_option(SCAN_TIMESTAMP_MILLIS_OPTION)? {
+            Ok(Some(TimeTravelSelector::TimestampMillis(ts)))
+        } else {
+            Ok(None)
+        }
+    }
+
     /// Explicit bucket key columns. If not set, defaults to primary keys for 
PK tables.
     pub fn bucket_key(&self) -> Option<Vec<String>> {
         self.options
@@ -230,4 +302,86 @@ mod tests {
         assert_eq!(core.partition_default_name(), "NULL_PART");
         assert!(!core.legacy_partition_name());
     }
+
+    #[test]
+    fn test_try_time_travel_selector_rejects_conflicting_selectors() {
+        let options = HashMap::from([
+            (SCAN_TAG_NAME_OPTION.to_string(), "tag1".to_string()),
+            (SCAN_SNAPSHOT_ID_OPTION.to_string(), "7".to_string()),
+        ]);
+        let core = CoreOptions::new(&options);
+
+        let err = core
+            .try_time_travel_selector()
+            .expect_err("conflicting selectors should fail");
+        match err {
+            crate::Error::DataInvalid { message, .. } => {
+                assert!(message.contains("Only one time-travel selector may be 
set"));
+                assert!(message.contains(SCAN_TAG_NAME_OPTION));
+                assert!(message.contains(SCAN_SNAPSHOT_ID_OPTION));
+            }
+            other => panic!("unexpected error: {other:?}"),
+        }
+    }
+
+    #[test]
+    fn test_try_time_travel_selector_rejects_invalid_numeric_values() {
+        let snapshot_options =
+            HashMap::from([(SCAN_SNAPSHOT_ID_OPTION.to_string(), 
"abc".to_string())]);
+        let snapshot_core = CoreOptions::new(&snapshot_options);
+
+        let snapshot_err = snapshot_core
+            .try_time_travel_selector()
+            .expect_err("invalid snapshot id should fail");
+        match snapshot_err {
+            crate::Error::DataInvalid { message, .. } => {
+                assert!(message.contains(SCAN_SNAPSHOT_ID_OPTION));
+            }
+            other => panic!("unexpected error: {other:?}"),
+        }
+
+        let timestamp_options =
+            HashMap::from([(SCAN_TIMESTAMP_MILLIS_OPTION.to_string(), 
"xyz".to_string())]);
+        let timestamp_core = CoreOptions::new(&timestamp_options);
+
+        let timestamp_err = timestamp_core
+            .try_time_travel_selector()
+            .expect_err("invalid timestamp millis should fail");
+        match timestamp_err {
+            crate::Error::DataInvalid { message, .. } => {
+                assert!(message.contains(SCAN_TIMESTAMP_MILLIS_OPTION));
+            }
+            other => panic!("unexpected error: {other:?}"),
+        }
+    }
+
+    #[test]
+    fn test_try_time_travel_selector_normalizes_valid_selector() {
+        let tag_options = HashMap::from([(SCAN_TAG_NAME_OPTION.to_string(), 
"tag1".to_string())]);
+        let tag_core = CoreOptions::new(&tag_options);
+        assert_eq!(
+            tag_core.try_time_travel_selector().expect("tag selector"),
+            Some(TimeTravelSelector::TagName("tag1"))
+        );
+
+        let snapshot_options =
+            HashMap::from([(SCAN_SNAPSHOT_ID_OPTION.to_string(), 
"7".to_string())]);
+        let snapshot_core = CoreOptions::new(&snapshot_options);
+        assert_eq!(
+            snapshot_core
+                .try_time_travel_selector()
+                .expect("snapshot selector"),
+            Some(TimeTravelSelector::SnapshotId(7))
+        );
+
+        let timestamp_options =
+            HashMap::from([(SCAN_TIMESTAMP_MILLIS_OPTION.to_string(), 
"1234".to_string())]);
+        let timestamp_core = CoreOptions::new(&timestamp_options);
+        assert_eq!(
+            timestamp_core
+                .try_time_travel_selector()
+                .expect("timestamp selector"),
+            Some(TimeTravelSelector::TimestampMillis(1234))
+        );
+    }
 }
diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs
index 2cddd82..50402a3 100644
--- a/crates/paimon/src/spec/mod.rs
+++ b/crates/paimon/src/spec/mod.rs
@@ -26,6 +26,7 @@ mod data_file;
 pub use data_file::*;
 
 mod core_options;
+pub(crate) use core_options::TimeTravelSelector;
 pub use core_options::*;
 
 mod schema;
diff --git a/crates/paimon/src/table/table_scan.rs 
b/crates/paimon/src/table/table_scan.rs
index da2dc24..f014b4b 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -31,7 +31,7 @@ use crate::io::FileIO;
 use crate::predicate_stats::data_leaf_may_match;
 use crate::spec::{
     eval_row, BinaryRow, CoreOptions, DataField, DataFileMeta, FileKind, 
IndexManifest,
-    ManifestEntry, ManifestFileMeta, PartitionComputer, Predicate, Snapshot,
+    ManifestEntry, ManifestFileMeta, PartitionComputer, Predicate, Snapshot, 
TimeTravelSelector,
 };
 use crate::table::bin_pack::split_for_batch;
 use crate::table::source::{DataSplit, DataSplitBuilder, DeletionFile, 
PartitionBucket, Plan};
@@ -326,47 +326,52 @@ impl<'a> TableScan<'a> {
     /// Plan the full scan: resolve snapshot (via options or latest), then 
read manifests and build DataSplits.
     ///
     /// Time travel is resolved from table options:
+    /// - only one of `scan.tag-name`, `scan.snapshot-id`, 
`scan.timestamp-millis` may be set
+    /// - `scan.tag-name` → read the snapshot pinned by that tag
     /// - `scan.snapshot-id` → read that specific snapshot
     /// - `scan.timestamp-millis` → find the latest snapshot <= that timestamp
     /// - otherwise → read the latest snapshot
     ///
     /// Reference: 
[TimeTravelUtil.tryTravelToSnapshot](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java)
     pub async fn plan(&self) -> crate::Result<Plan> {
+        let snapshot = match self.resolve_snapshot().await? {
+            Some(snapshot) => snapshot,
+            None => return Ok(Plan::new(Vec::new())),
+        };
+        self.plan_snapshot(snapshot).await
+    }
+
+    async fn resolve_snapshot(&self) -> crate::Result<Option<Snapshot>> {
         let file_io = self.table.file_io();
         let table_path = self.table.location();
         let snapshot_manager = SnapshotManager::new(file_io.clone(), 
table_path.to_string());
         let core_options = CoreOptions::new(self.table.schema().options());
 
-        let snapshot = if let Some(tag_name) = core_options.scan_tag_name() {
-            let tag_manager = TagManager::new(file_io.clone(), 
table_path.to_string());
-            match tag_manager.get(tag_name).await? {
-                Some(s) => s,
-                None => {
-                    return Err(Error::DataInvalid {
+        match core_options.try_time_travel_selector()? {
+            Some(TimeTravelSelector::TagName(tag_name)) => {
+                let tag_manager = TagManager::new(file_io.clone(), 
table_path.to_string());
+                match tag_manager.get(tag_name).await? {
+                    Some(s) => Ok(Some(s)),
+                    None => Err(Error::DataInvalid {
                         message: format!("Tag '{tag_name}' doesn't exist."),
                         source: None,
-                    })
+                    }),
                 }
             }
-        } else if let Some(id) = core_options.scan_snapshot_id() {
-            snapshot_manager.get_snapshot(id).await?
-        } else if let Some(ts) = core_options.scan_timestamp_millis() {
-            match snapshot_manager.earlier_or_equal_time_mills(ts).await? {
-                Some(s) => s,
-                None => {
-                    return Err(Error::DataInvalid {
+            Some(TimeTravelSelector::SnapshotId(id)) => {
+                snapshot_manager.get_snapshot(id).await.map(Some)
+            }
+            Some(TimeTravelSelector::TimestampMillis(ts)) => {
+                match snapshot_manager.earlier_or_equal_time_mills(ts).await? {
+                    Some(s) => Ok(Some(s)),
+                    None => Err(Error::DataInvalid {
                         message: format!("No snapshot found with timestamp <= 
{ts}"),
                         source: None,
-                    })
+                    }),
                 }
             }
-        } else {
-            match snapshot_manager.get_latest_snapshot().await? {
-                Some(s) => s,
-                None => return Ok(Plan::new(Vec::new())),
-            }
-        };
-        self.plan_snapshot(snapshot).await
+            None => snapshot_manager.get_latest_snapshot().await,
+        }
     }
 
     /// Apply a limit-pushdown hint to the generated splits.

Reply via email to