This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new d5b8e3c feat: add `get_file_slices_splits_between` API (#411)
d5b8e3c is described below
commit d5b8e3c20d896ae7e748635f5b6038105eac4412
Author: Yunchi Pang <[email protected]>
AuthorDate: Thu Aug 21 19:14:36 2025 -0700
feat: add `get_file_slices_splits_between` API (#411)
Add get_file_slices_splits_between() API, enables parallel processing of
incremental data changes between timestamps.
---------
Signed-off-by: Yunchi Pang <[email protected]>
---
crates/core/src/table/mod.rs | 145 +++++++++++++++++++++++++++++++------
crates/core/src/util/collection.rs | 58 +++++++++++++++
2 files changed, 181 insertions(+), 22 deletions(-)
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 7d281f7..42e9053 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -106,6 +106,7 @@ use crate::table::fs_view::FileSystemView;
use crate::table::partition::PartitionPruner;
use crate::timeline::util::format_timestamp;
use crate::timeline::{Timeline, EARLIEST_START_TIMESTAMP};
+use crate::util::collection::split_into_chunks;
use crate::Result;
use arrow::record_batch::RecordBatch;
use arrow_schema::{Field, Schema};
@@ -294,11 +295,11 @@ impl Table {
/// Get all the [FileSlice]s in splits from the table.
///
/// # Arguments
- /// * `n` - The number of chunks to split the file slices into.
+ /// * `num_splits` - The number of chunks to split the file slices
into.
/// * `filters` - Partition filters to apply.
pub async fn get_file_slices_splits<I, S>(
&self,
- n: usize,
+ num_splits: usize,
filters: I,
) -> Result<Vec<Vec<FileSlice>>>
where
@@ -307,7 +308,7 @@ impl Table {
{
if let Some(timestamp) =
self.timeline.get_latest_commit_timestamp_as_option() {
let filters = from_str_tuples(filters)?;
- self.get_file_slices_splits_internal(n, timestamp, &filters)
+ self.get_file_slices_splits_internal(num_splits, timestamp,
&filters)
.await
} else {
Ok(Vec::new())
@@ -317,7 +318,7 @@ impl Table {
/// Same as [Table::get_file_slices_splits], but blocking.
pub fn get_file_slices_splits_blocking<I, S>(
&self,
- n: usize,
+ num_splits: usize,
filters: I,
) -> Result<Vec<Vec<FileSlice>>>
where
@@ -327,18 +328,18 @@ impl Table {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?
- .block_on(async { self.get_file_slices_splits(n, filters).await })
+ .block_on(async { self.get_file_slices_splits(num_splits,
filters).await })
}
/// Get all the [FileSlice]s in splits from the table at a given timestamp.
///
/// # Arguments
- /// * `n` - The number of chunks to split the file slices into.
+ /// * `num_splits` - The number of chunks to split the file slices
into.
/// * `timestamp` - The timestamp which file slices associated with.
/// * `filters` - Partition filters to apply.
pub async fn get_file_slices_splits_as_of<I, S>(
&self,
- n: usize,
+ num_splits: usize,
timestamp: &str,
filters: I,
) -> Result<Vec<Vec<FileSlice>>>
@@ -348,14 +349,14 @@ impl Table {
{
let timestamp = format_timestamp(timestamp, &self.timezone())?;
let filters = from_str_tuples(filters)?;
- self.get_file_slices_splits_internal(n, ×tamp, &filters)
+ self.get_file_slices_splits_internal(num_splits, ×tamp, &filters)
.await
}
/// Same as [Table::get_file_slices_splits_as_of], but blocking.
pub fn get_file_slices_splits_as_of_blocking<I, S>(
&self,
- n: usize,
+ num_splits: usize,
timestamp: &str,
filters: I,
) -> Result<Vec<Vec<FileSlice>>>
@@ -367,29 +368,19 @@ impl Table {
.enable_all()
.build()?
.block_on(async {
- self.get_file_slices_splits_as_of(n, timestamp, filters)
+ self.get_file_slices_splits_as_of(num_splits, timestamp,
filters)
.await
})
}
async fn get_file_slices_splits_internal(
&self,
- n: usize,
+ num_splits: usize,
timestamp: &str,
filters: &[Filter],
) -> Result<Vec<Vec<FileSlice>>> {
let file_slices = self.get_file_slices_internal(timestamp,
filters).await?;
- if file_slices.is_empty() {
- return Ok(Vec::new());
- }
-
- let n = std::cmp::max(1, n);
- let chunk_size = file_slices.len().div_ceil(n);
-
- Ok(file_slices
- .chunks(chunk_size)
- .map(|chunk| chunk.to_vec())
- .collect())
+ Ok(split_into_chunks(file_slices, num_splits))
}
/// Get all the [FileSlice]s in the table.
@@ -520,6 +511,64 @@ impl Table {
})
}
+ /// Get all the changed [FileSlice]s in splits from the table between the
given timestamps.
+ ///
+ /// # Arguments
+ /// * `num_splits` - The number of chunks to split the file slices
into.
+ /// * `start_timestamp` - If provided, only file slices that were
changed after this timestamp will be returned.
+ /// * `end_timestamp` - If provided, only file slices that were
changed before or at this timestamp will be returned.
+ ///
+ /// # Notes
+ /// * This API is useful for implementing incremental query with read
parallelism.
+ /// * Uses the same splitting flow as the time-travel API to respect
read parallelism config.
+ pub async fn get_file_slices_splits_between(
+ &self,
+ num_splits: usize,
+ start_timestamp: Option<&str>,
+ end_timestamp: Option<&str>,
+ ) -> Result<Vec<Vec<FileSlice>>> {
+ // If the end timestamp is not provided, use the latest commit
timestamp.
+ let Some(end) =
+ end_timestamp.or_else(||
self.timeline.get_latest_commit_timestamp_as_option())
+ else {
+ // No latest commit timestamp means the table is empty.
+ return Ok(Vec::new());
+ };
+
+ let start = start_timestamp.unwrap_or(EARLIEST_START_TIMESTAMP);
+
+ self.get_file_slices_splits_between_internal(num_splits, start, end)
+ .await
+ }
+
+ /// Same as [Table::get_file_slices_splits_between], but blocking.
+ pub fn get_file_slices_splits_between_blocking(
+ &self,
+ num_splits: usize,
+ start_timestamp: Option<&str>,
+ end_timestamp: Option<&str>,
+ ) -> Result<Vec<Vec<FileSlice>>> {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()?
+ .block_on(async {
+ self.get_file_slices_splits_between(num_splits,
start_timestamp, end_timestamp)
+ .await
+ })
+ }
+
+ async fn get_file_slices_splits_between_internal(
+ &self,
+ num_splits: usize,
+ start_timestamp: &str,
+ end_timestamp: &str,
+ ) -> Result<Vec<Vec<FileSlice>>> {
+ let file_slices = self
+ .get_file_slices_between_internal(start_timestamp, end_timestamp)
+ .await?;
+ Ok(split_into_chunks(file_slices, num_splits))
+ }
+
async fn get_file_slices_between_internal(
&self,
start_timestamp: &str,
@@ -1260,6 +1309,58 @@ mod tests {
assert!(file_slice_2.log_files.is_empty());
}
+ #[test]
+ fn empty_hudi_table_get_file_slices_splits_between() {
+ let base_url = SampleTable::V6Empty.url_to_cow();
+ let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+ let file_slices_splits = hudi_table
+ .get_file_slices_splits_between_blocking(2,
Some(EARLIEST_START_TIMESTAMP), None)
+ .unwrap();
+ assert!(file_slices_splits.is_empty())
+ }
+
+ #[test]
+ fn hudi_table_get_file_slices_splits_between() {
+ let base_url =
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor_parquet();
+ let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+ let file_slices_splits = hudi_table
+ .get_file_slices_splits_between_blocking(2, None,
Some("20250121000656060"))
+ .unwrap();
+
+ assert_eq!(file_slices_splits.len(), 2);
+ let total_file_slices: usize = file_slices_splits.iter().map(|split|
split.len()).sum();
+ assert_eq!(total_file_slices, 3);
+ assert_eq!(file_slices_splits[0].len(), 2);
+ assert_eq!(file_slices_splits[1].len(), 1);
+ }
+
+ #[test]
+ fn hudi_table_get_file_slices_splits_between_with_single_split() {
+ let base_url =
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor_parquet();
+ let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+ let file_slices_splits = hudi_table
+ .get_file_slices_splits_between_blocking(1, None,
Some("20250121000656060"))
+ .unwrap();
+
+ // Should have 1 split with all 3 file slices
+ assert_eq!(file_slices_splits.len(), 1);
+ assert_eq!(file_slices_splits[0].len(), 3);
+ }
+
+ #[test]
+ fn hudi_table_get_file_slices_splits_between_with_many_splits() {
+ let base_url =
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor_parquet();
+ let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+ let file_slices_splits = hudi_table
+ .get_file_slices_splits_between_blocking(10, None,
Some("20250121000656060"))
+ .unwrap();
+
+ assert_eq!(file_slices_splits.len(), 3);
+ for split in &file_slices_splits {
+ assert_eq!(split.len(), 1);
+ }
+ }
+
#[test]
fn hudi_table_get_file_paths_for_simple_keygen_non_hive_style() {
let base_url = SampleTable::V6SimplekeygenNonhivestyle.url_to_cow();
diff --git a/crates/core/src/util/collection.rs
b/crates/core/src/util/collection.rs
index 8c66ac3..c11de7c 100644
--- a/crates/core/src/util/collection.rs
+++ b/crates/core/src/util/collection.rs
@@ -28,3 +28,61 @@ where
target.entry(key.clone()).or_insert_with(|| value.clone());
}
}
+
+/// Split a vector into approximately equal chunks based on the specified
number of splits.
+///
+/// # Arguments
+/// * `items` - The vector to split
+/// * `num_splits` - The desired number of chunks (will be clamped to at least
1)
+pub fn split_into_chunks<T: Clone>(items: Vec<T>, num_splits: usize) ->
Vec<Vec<T>> {
+ if items.is_empty() {
+ return Vec::new();
+ }
+
+ let n = std::cmp::max(1, num_splits);
+ let chunk_size = items.len().div_ceil(n);
+
+ items
+ .chunks(chunk_size)
+ .map(|chunk| chunk.to_vec())
+ .collect()
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_split_into_chunks_zero_splits() {
+ let items = vec![1, 2, 3, 4, 5];
+ let result = split_into_chunks(items.clone(), 0);
+ assert_eq!(result.len(), 1, "Zero splits should be clamped to 1");
+ assert_eq!(result[0], items, "All items should be in single chunk");
+ }
+
+ #[test]
+ fn test_split_into_chunks_empty_input() {
+ let items: Vec<i32> = vec![];
+ let result = split_into_chunks(items, 2);
+ assert!(result.is_empty(), "Empty input should return empty result");
+ }
+
+ #[test]
+ fn test_split_into_chunks_more_splits_than_items() {
+ let items = vec![1, 2, 3];
+ let result = split_into_chunks(items, 5);
+ assert_eq!(result.len(), 3, "Should return 3 chunks (one per item)");
+ assert_eq!(result[0], vec![1]);
+ assert_eq!(result[1], vec![2]);
+ assert_eq!(result[2], vec![3]);
+ }
+
+ #[test]
+ fn test_split_into_chunks_normal_case() {
+ let items = vec![1, 2, 3, 4, 5];
+ let result = split_into_chunks(items, 2);
+ assert_eq!(result.len(), 2, "Should return 2 chunks");
+ assert_eq!(result[0], vec![1, 2, 3], "First chunk should have 3
items");
+ assert_eq!(result[1], vec![4, 5], "Second chunk should have 2 items");
+ }
+}