codope commented on code in PR #411:
URL: https://github.com/apache/hudi-rs/pull/411#discussion_r2269157673
##########
crates/core/src/table/mod.rs:
##########
@@ -1260,6 +1326,58 @@ mod tests {
assert!(file_slice_2.log_files.is_empty());
}
+ #[test]
Review Comment:
let's also add a couple of boundary conditions like n=0, end_timestamp=None.
##########
crates/core/src/table/mod.rs:
##########
@@ -520,6 +520,72 @@ impl Table {
})
}
+ /// Get all the changed [FileSlice]s in splits from the table between the
given timestamps.
+ ///
+ /// # Arguments
+ /// * `n` - The number of chunks to split the file slices into.
+ /// * `start_timestamp` - If provided, only file slices that were
changed after this timestamp will be returned.
+ /// * `end_timestamp` - If provided, only file slices that were
changed before or at this timestamp will be returned.
+ ///
+ /// # Notes
+ /// * This API is useful for implementing incremental query with read
parallelism.
+ /// * Uses the same splitting flow as the time-travel API to respect
read parallelism config.
+ pub async fn get_file_slices_splits_between(
+ &self,
+ n: usize,
+ start_timestamp: Option<&str>,
+ end_timestamp: Option<&str>,
+ ) -> Result<Vec<Vec<FileSlice>>> {
+ // If the end timestamp is not provided, use the latest commit
timestamp.
+ let Some(end) =
+ end_timestamp.or_else(||
self.timeline.get_latest_commit_timestamp_as_option())
+ else {
+ // No latest commit timestamp means the table is empty.
+ return Ok(Vec::new());
+ };
+
+ let start = start_timestamp.unwrap_or(EARLIEST_START_TIMESTAMP);
+
+ self.get_file_slices_splits_between_internal(n, start, end)
+ .await
+ }
+
+ /// Same as [Table::get_file_slices_splits_between], but blocking.
+ pub fn get_file_slices_splits_between_blocking(
+ &self,
+ n: usize,
Review Comment:
Consider renaming parameter `n` to `num_splits` for readability in public
APIs. And how does n=0 behave? Does it return a single split containing all
elements?
##########
crates/core/src/table/mod.rs:
##########
@@ -520,6 +520,72 @@ impl Table {
})
}
+ /// Get all the changed [FileSlice]s in splits from the table between the
given timestamps.
+ ///
+ /// # Arguments
+ /// * `n` - The number of chunks to split the file slices into.
+ /// * `start_timestamp` - If provided, only file slices that were
changed after this timestamp will be returned.
+ /// * `end_timestamp` - If provided, only file slices that were
changed before or at this timestamp will be returned.
Review Comment:
Is it the same semantics (start_timestamp exclusive, end_timestamp
inclusive) across other engines? Start inclusive, end exclusive feels more
intuitive but if our semantics match across engines, I am fine with it.
##########
crates/core/src/table/mod.rs:
##########
@@ -520,6 +520,72 @@ impl Table {
})
}
+ /// Get all the changed [FileSlice]s in splits from the table between the
given timestamps.
+ ///
+ /// # Arguments
+ /// * `n` - The number of chunks to split the file slices into.
+ /// * `start_timestamp` - If provided, only file slices that were
changed after this timestamp will be returned.
+ /// * `end_timestamp` - If provided, only file slices that were
changed before or at this timestamp will be returned.
+ ///
+ /// # Notes
+ /// * This API is useful for implementing incremental query with read
parallelism.
+ /// * Uses the same splitting flow as the time-travel API to respect
read parallelism config.
+ pub async fn get_file_slices_splits_between(
+ &self,
+ n: usize,
+ start_timestamp: Option<&str>,
+ end_timestamp: Option<&str>,
+ ) -> Result<Vec<Vec<FileSlice>>> {
+ // If the end timestamp is not provided, use the latest commit
timestamp.
+ let Some(end) =
+ end_timestamp.or_else(||
self.timeline.get_latest_commit_timestamp_as_option())
+ else {
+ // No latest commit timestamp means the table is empty.
+ return Ok(Vec::new());
+ };
+
+ let start = start_timestamp.unwrap_or(EARLIEST_START_TIMESTAMP);
+
+ self.get_file_slices_splits_between_internal(n, start, end)
+ .await
+ }
+
+ /// Same as [Table::get_file_slices_splits_between], but blocking.
+ pub fn get_file_slices_splits_between_blocking(
+ &self,
+ n: usize,
+ start_timestamp: Option<&str>,
+ end_timestamp: Option<&str>,
+ ) -> Result<Vec<Vec<FileSlice>>> {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()?
+ .block_on(async {
+ self.get_file_slices_splits_between(n, start_timestamp,
end_timestamp)
+ .await
+ })
+ }
+
+ async fn get_file_slices_splits_between_internal(
+ &self,
+ n: usize,
+ start_timestamp: &str,
+ end_timestamp: &str,
+ ) -> Result<Vec<Vec<FileSlice>>> {
+ let file_slices =
self.get_file_slices_between_internal(start_timestamp, end_timestamp).await?;
+ if file_slices.is_empty() {
+ return Ok(Vec::new());
+ }
+
+ let n = std::cmp::max(1, n);
+ let chunk_size = file_slices.len().div_ceil(n);
+
+ Ok(file_slices
+ .chunks(chunk_size)
+ .map(|chunk| chunk.to_vec())
+ .collect())
Review Comment:
+1
##########
crates/core/src/table/mod.rs:
##########
@@ -520,6 +520,72 @@ impl Table {
})
}
+ /// Get all the changed [FileSlice]s in splits from the table between the
given timestamps.
+ ///
+ /// # Arguments
+ /// * `n` - The number of chunks to split the file slices into.
+ /// * `start_timestamp` - If provided, only file slices that were
changed after this timestamp will be returned.
+ /// * `end_timestamp` - If provided, only file slices that were
changed before or at this timestamp will be returned.
+ ///
+ /// # Notes
+ /// * This API is useful for implementing incremental query with read
parallelism.
+ /// * Uses the same splitting flow as the time-travel API to respect
read parallelism config.
+ pub async fn get_file_slices_splits_between(
+ &self,
+ n: usize,
+ start_timestamp: Option<&str>,
+ end_timestamp: Option<&str>,
+ ) -> Result<Vec<Vec<FileSlice>>> {
+ // If the end timestamp is not provided, use the latest commit
timestamp.
+ let Some(end) =
+ end_timestamp.or_else(||
self.timeline.get_latest_commit_timestamp_as_option())
+ else {
+ // No latest commit timestamp means the table is empty.
+ return Ok(Vec::new());
+ };
+
+ let start = start_timestamp.unwrap_or(EARLIEST_START_TIMESTAMP);
+
+ self.get_file_slices_splits_between_internal(n, start, end)
+ .await
+ }
+
+ /// Same as [Table::get_file_slices_splits_between], but blocking.
+ pub fn get_file_slices_splits_between_blocking(
+ &self,
+ n: usize,
+ start_timestamp: Option<&str>,
+ end_timestamp: Option<&str>,
+ ) -> Result<Vec<Vec<FileSlice>>> {
Review Comment:
Returning `Vec<Vec<FileSlice>>` matches existing APIs but clones. This is
fine now and consistent; if you need to optimize later, a slice-based variant
could be introduced without breaking the current API. We need a util that takes
`Vec<FileSlice>` and returns `Vec<Vec<FileSlice>>` by moving, not cloning.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]