This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new d5b8e3c  feat: add `get_file_slices_splits_between` API (#411)
d5b8e3c is described below

commit d5b8e3c20d896ae7e748635f5b6038105eac4412
Author: Yunchi Pang <[email protected]>
AuthorDate: Thu Aug 21 19:14:36 2025 -0700

    feat: add `get_file_slices_splits_between` API (#411)
    
    Add get_file_slices_splits_between() API, enables parallel processing of 
incremental data changes between timestamps.
    
    ---------
    
    Signed-off-by: Yunchi Pang <[email protected]>
---
 crates/core/src/table/mod.rs       | 145 +++++++++++++++++++++++++++++++------
 crates/core/src/util/collection.rs |  58 +++++++++++++++
 2 files changed, 181 insertions(+), 22 deletions(-)

diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 7d281f7..42e9053 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -106,6 +106,7 @@ use crate::table::fs_view::FileSystemView;
 use crate::table::partition::PartitionPruner;
 use crate::timeline::util::format_timestamp;
 use crate::timeline::{Timeline, EARLIEST_START_TIMESTAMP};
+use crate::util::collection::split_into_chunks;
 use crate::Result;
 use arrow::record_batch::RecordBatch;
 use arrow_schema::{Field, Schema};
@@ -294,11 +295,11 @@ impl Table {
     /// Get all the [FileSlice]s in splits from the table.
     ///
     /// # Arguments
-    ///     * `n` - The number of chunks to split the file slices into.
+    ///     * `num_splits` - The number of chunks to split the file slices 
into.
     ///     * `filters` - Partition filters to apply.
     pub async fn get_file_slices_splits<I, S>(
         &self,
-        n: usize,
+        num_splits: usize,
         filters: I,
     ) -> Result<Vec<Vec<FileSlice>>>
     where
@@ -307,7 +308,7 @@ impl Table {
     {
         if let Some(timestamp) = 
self.timeline.get_latest_commit_timestamp_as_option() {
             let filters = from_str_tuples(filters)?;
-            self.get_file_slices_splits_internal(n, timestamp, &filters)
+            self.get_file_slices_splits_internal(num_splits, timestamp, 
&filters)
                 .await
         } else {
             Ok(Vec::new())
@@ -317,7 +318,7 @@ impl Table {
     /// Same as [Table::get_file_slices_splits], but blocking.
     pub fn get_file_slices_splits_blocking<I, S>(
         &self,
-        n: usize,
+        num_splits: usize,
         filters: I,
     ) -> Result<Vec<Vec<FileSlice>>>
     where
@@ -327,18 +328,18 @@ impl Table {
         tokio::runtime::Builder::new_current_thread()
             .enable_all()
             .build()?
-            .block_on(async { self.get_file_slices_splits(n, filters).await })
+            .block_on(async { self.get_file_slices_splits(num_splits, 
filters).await })
     }
 
     /// Get all the [FileSlice]s in splits from the table at a given timestamp.
     ///
     /// # Arguments
-    ///     * `n` - The number of chunks to split the file slices into.
+    ///     * `num_splits` - The number of chunks to split the file slices 
into.
     ///     * `timestamp` - The timestamp which file slices associated with.
     ///     * `filters` - Partition filters to apply.
     pub async fn get_file_slices_splits_as_of<I, S>(
         &self,
-        n: usize,
+        num_splits: usize,
         timestamp: &str,
         filters: I,
     ) -> Result<Vec<Vec<FileSlice>>>
@@ -348,14 +349,14 @@ impl Table {
     {
         let timestamp = format_timestamp(timestamp, &self.timezone())?;
         let filters = from_str_tuples(filters)?;
-        self.get_file_slices_splits_internal(n, &timestamp, &filters)
+        self.get_file_slices_splits_internal(num_splits, &timestamp, &filters)
             .await
     }
 
     /// Same as [Table::get_file_slices_splits_as_of], but blocking.
     pub fn get_file_slices_splits_as_of_blocking<I, S>(
         &self,
-        n: usize,
+        num_splits: usize,
         timestamp: &str,
         filters: I,
     ) -> Result<Vec<Vec<FileSlice>>>
@@ -367,29 +368,19 @@ impl Table {
             .enable_all()
             .build()?
             .block_on(async {
-                self.get_file_slices_splits_as_of(n, timestamp, filters)
+                self.get_file_slices_splits_as_of(num_splits, timestamp, 
filters)
                     .await
             })
     }
 
     async fn get_file_slices_splits_internal(
         &self,
-        n: usize,
+        num_splits: usize,
         timestamp: &str,
         filters: &[Filter],
     ) -> Result<Vec<Vec<FileSlice>>> {
         let file_slices = self.get_file_slices_internal(timestamp, 
filters).await?;
-        if file_slices.is_empty() {
-            return Ok(Vec::new());
-        }
-
-        let n = std::cmp::max(1, n);
-        let chunk_size = file_slices.len().div_ceil(n);
-
-        Ok(file_slices
-            .chunks(chunk_size)
-            .map(|chunk| chunk.to_vec())
-            .collect())
+        Ok(split_into_chunks(file_slices, num_splits))
     }
 
     /// Get all the [FileSlice]s in the table.
@@ -520,6 +511,64 @@ impl Table {
             })
     }
 
+    /// Get all the changed [FileSlice]s in splits from the table between the 
given timestamps.
+    ///
+    /// # Arguments
+    ///     * `num_splits` - The number of chunks to split the file slices 
into.
+    ///     * `start_timestamp` - If provided, only file slices that were 
changed after this timestamp will be returned.
+    ///     * `end_timestamp` - If provided, only file slices that were 
changed before or at this timestamp will be returned.
+    ///
+    /// # Notes
+    ///     * This API is useful for implementing incremental query with read 
parallelism.
+    ///     * Uses the same splitting flow as the time-travel API to respect 
read parallelism config.
+    pub async fn get_file_slices_splits_between(
+        &self,
+        num_splits: usize,
+        start_timestamp: Option<&str>,
+        end_timestamp: Option<&str>,
+    ) -> Result<Vec<Vec<FileSlice>>> {
+        // If the end timestamp is not provided, use the latest commit 
timestamp.
+        let Some(end) =
+            end_timestamp.or_else(|| 
self.timeline.get_latest_commit_timestamp_as_option())
+        else {
+            // No latest commit timestamp means the table is empty.
+            return Ok(Vec::new());
+        };
+
+        let start = start_timestamp.unwrap_or(EARLIEST_START_TIMESTAMP);
+
+        self.get_file_slices_splits_between_internal(num_splits, start, end)
+            .await
+    }
+
+    /// Same as [Table::get_file_slices_splits_between], but blocking.
+    pub fn get_file_slices_splits_between_blocking(
+        &self,
+        num_splits: usize,
+        start_timestamp: Option<&str>,
+        end_timestamp: Option<&str>,
+    ) -> Result<Vec<Vec<FileSlice>>> {
+        tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()?
+            .block_on(async {
+                self.get_file_slices_splits_between(num_splits, 
start_timestamp, end_timestamp)
+                    .await
+            })
+    }
+
+    async fn get_file_slices_splits_between_internal(
+        &self,
+        num_splits: usize,
+        start_timestamp: &str,
+        end_timestamp: &str,
+    ) -> Result<Vec<Vec<FileSlice>>> {
+        let file_slices = self
+            .get_file_slices_between_internal(start_timestamp, end_timestamp)
+            .await?;
+        Ok(split_into_chunks(file_slices, num_splits))
+    }
+
     async fn get_file_slices_between_internal(
         &self,
         start_timestamp: &str,
@@ -1260,6 +1309,58 @@ mod tests {
         assert!(file_slice_2.log_files.is_empty());
     }
 
+    #[test]
+    fn empty_hudi_table_get_file_slices_splits_between() {
+        let base_url = SampleTable::V6Empty.url_to_cow();
+        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+        let file_slices_splits = hudi_table
+            .get_file_slices_splits_between_blocking(2, 
Some(EARLIEST_START_TIMESTAMP), None)
+            .unwrap();
+        assert!(file_slices_splits.is_empty())
+    }
+
+    #[test]
+    fn hudi_table_get_file_slices_splits_between() {
+        let base_url = 
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor_parquet();
+        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+        let file_slices_splits = hudi_table
+            .get_file_slices_splits_between_blocking(2, None, 
Some("20250121000656060"))
+            .unwrap();
+
+        assert_eq!(file_slices_splits.len(), 2);
+        let total_file_slices: usize = file_slices_splits.iter().map(|split| 
split.len()).sum();
+        assert_eq!(total_file_slices, 3);
+        assert_eq!(file_slices_splits[0].len(), 2);
+        assert_eq!(file_slices_splits[1].len(), 1);
+    }
+
+    #[test]
+    fn hudi_table_get_file_slices_splits_between_with_single_split() {
+        let base_url = 
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor_parquet();
+        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+        let file_slices_splits = hudi_table
+            .get_file_slices_splits_between_blocking(1, None, 
Some("20250121000656060"))
+            .unwrap();
+
+        // Should have 1 split with all 3 file slices
+        assert_eq!(file_slices_splits.len(), 1);
+        assert_eq!(file_slices_splits[0].len(), 3);
+    }
+
+    #[test]
+    fn hudi_table_get_file_slices_splits_between_with_many_splits() {
+        let base_url = 
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor_parquet();
+        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+        let file_slices_splits = hudi_table
+            .get_file_slices_splits_between_blocking(10, None, 
Some("20250121000656060"))
+            .unwrap();
+
+        assert_eq!(file_slices_splits.len(), 3);
+        for split in &file_slices_splits {
+            assert_eq!(split.len(), 1);
+        }
+    }
+
     #[test]
     fn hudi_table_get_file_paths_for_simple_keygen_non_hive_style() {
         let base_url = SampleTable::V6SimplekeygenNonhivestyle.url_to_cow();
diff --git a/crates/core/src/util/collection.rs 
b/crates/core/src/util/collection.rs
index 8c66ac3..c11de7c 100644
--- a/crates/core/src/util/collection.rs
+++ b/crates/core/src/util/collection.rs
@@ -28,3 +28,61 @@ where
         target.entry(key.clone()).or_insert_with(|| value.clone());
     }
 }
+
+/// Split a vector into approximately equal chunks based on the specified 
number of splits.
+///
+/// # Arguments
+/// * `items` - The vector to split
+/// * `num_splits` - The desired number of chunks (will be clamped to at least 
1)
+pub fn split_into_chunks<T: Clone>(items: Vec<T>, num_splits: usize) -> 
Vec<Vec<T>> {
+    if items.is_empty() {
+        return Vec::new();
+    }
+
+    let n = std::cmp::max(1, num_splits);
+    let chunk_size = items.len().div_ceil(n);
+
+    items
+        .chunks(chunk_size)
+        .map(|chunk| chunk.to_vec())
+        .collect()
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_split_into_chunks_zero_splits() {
+        let items = vec![1, 2, 3, 4, 5];
+        let result = split_into_chunks(items.clone(), 0);
+        assert_eq!(result.len(), 1, "Zero splits should be clamped to 1");
+        assert_eq!(result[0], items, "All items should be in single chunk");
+    }
+
+    #[test]
+    fn test_split_into_chunks_empty_input() {
+        let items: Vec<i32> = vec![];
+        let result = split_into_chunks(items, 2);
+        assert!(result.is_empty(), "Empty input should return empty result");
+    }
+
+    #[test]
+    fn test_split_into_chunks_more_splits_than_items() {
+        let items = vec![1, 2, 3];
+        let result = split_into_chunks(items, 5);
+        assert_eq!(result.len(), 3, "Should return 3 chunks (one per item)");
+        assert_eq!(result[0], vec![1]);
+        assert_eq!(result[1], vec![2]);
+        assert_eq!(result[2], vec![3]);
+    }
+
+    #[test]
+    fn test_split_into_chunks_normal_case() {
+        let items = vec![1, 2, 3, 4, 5];
+        let result = split_into_chunks(items, 2);
+        assert_eq!(result.len(), 2, "Should return 2 chunks");
+        assert_eq!(result[0], vec![1, 2, 3], "First chunk should have 3 
items");
+        assert_eq!(result[1], vec![4, 5], "Second chunk should have 2 items");
+    }
+}

Reply via email to