codope commented on code in PR #411:
URL: https://github.com/apache/hudi-rs/pull/411#discussion_r2269157673


##########
crates/core/src/table/mod.rs:
##########
@@ -1260,6 +1326,58 @@ mod tests {
         assert!(file_slice_2.log_files.is_empty());
     }
 
+    #[test]

Review Comment:
   let's also add a couple of boundary conditions like n=0, end_timestamp=None.



##########
crates/core/src/table/mod.rs:
##########
@@ -520,6 +520,72 @@ impl Table {
             })
     }
 
+    /// Get all the changed [FileSlice]s in splits from the table between the 
given timestamps.
+    ///
+    /// # Arguments
+    ///     * `n` - The number of chunks to split the file slices into.
+    ///     * `start_timestamp` - If provided, only file slices that were 
changed after this timestamp will be returned.
+    ///     * `end_timestamp` - If provided, only file slices that were 
changed before or at this timestamp will be returned.
+    ///
+    /// # Notes
+    ///     * This API is useful for implementing incremental query with read 
parallelism.
+    ///     * Uses the same splitting flow as the time-travel API to respect 
read parallelism config.
+    pub async fn get_file_slices_splits_between(
+        &self,
+        n: usize,
+        start_timestamp: Option<&str>,
+        end_timestamp: Option<&str>,
+    ) -> Result<Vec<Vec<FileSlice>>> {
+        // If the end timestamp is not provided, use the latest commit 
timestamp.
+        let Some(end) =
+            end_timestamp.or_else(|| 
self.timeline.get_latest_commit_timestamp_as_option())
+        else {
+            // No latest commit timestamp means the table is empty.
+            return Ok(Vec::new());
+        };
+
+        let start = start_timestamp.unwrap_or(EARLIEST_START_TIMESTAMP);
+
+        self.get_file_slices_splits_between_internal(n, start, end)
+            .await
+    }
+
+    /// Same as [Table::get_file_slices_splits_between], but blocking.
+    pub fn get_file_slices_splits_between_blocking(
+        &self,
+        n: usize,

Review Comment:
   Consider renaming parameter `n` to `num_splits` for readability in public 
APIs. And how does n=0 behave? Does it return a single split containing all 
elements?



##########
crates/core/src/table/mod.rs:
##########
@@ -520,6 +520,72 @@ impl Table {
             })
     }
 
+    /// Get all the changed [FileSlice]s in splits from the table between the 
given timestamps.
+    ///
+    /// # Arguments
+    ///     * `n` - The number of chunks to split the file slices into.
+    ///     * `start_timestamp` - If provided, only file slices that were 
changed after this timestamp will be returned.
+    ///     * `end_timestamp` - If provided, only file slices that were 
changed before or at this timestamp will be returned.

Review Comment:
   Is it the same semantics (start_timestamp exclusive, end_timestamp 
inclusive) across other engines? Start inclusive, end exclusive feels more 
intuitive but if our semantics match across engines, I am fine with it.



##########
crates/core/src/table/mod.rs:
##########
@@ -520,6 +520,72 @@ impl Table {
             })
     }
 
+    /// Get all the changed [FileSlice]s in splits from the table between the 
given timestamps.
+    ///
+    /// # Arguments
+    ///     * `n` - The number of chunks to split the file slices into.
+    ///     * `start_timestamp` - If provided, only file slices that were 
changed after this timestamp will be returned.
+    ///     * `end_timestamp` - If provided, only file slices that were 
changed before or at this timestamp will be returned.
+    ///
+    /// # Notes
+    ///     * This API is useful for implementing incremental query with read 
parallelism.
+    ///     * Uses the same splitting flow as the time-travel API to respect 
read parallelism config.
+    pub async fn get_file_slices_splits_between(
+        &self,
+        n: usize,
+        start_timestamp: Option<&str>,
+        end_timestamp: Option<&str>,
+    ) -> Result<Vec<Vec<FileSlice>>> {
+        // If the end timestamp is not provided, use the latest commit 
timestamp.
+        let Some(end) =
+            end_timestamp.or_else(|| 
self.timeline.get_latest_commit_timestamp_as_option())
+        else {
+            // No latest commit timestamp means the table is empty.
+            return Ok(Vec::new());
+        };
+
+        let start = start_timestamp.unwrap_or(EARLIEST_START_TIMESTAMP);
+
+        self.get_file_slices_splits_between_internal(n, start, end)
+            .await
+    }
+
+    /// Same as [Table::get_file_slices_splits_between], but blocking.
+    pub fn get_file_slices_splits_between_blocking(
+        &self,
+        n: usize,
+        start_timestamp: Option<&str>,
+        end_timestamp: Option<&str>,
+    ) -> Result<Vec<Vec<FileSlice>>> {
+        tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()?
+            .block_on(async {
+                self.get_file_slices_splits_between(n, start_timestamp, 
end_timestamp)
+                    .await
+            })
+    }
+
+    async fn get_file_slices_splits_between_internal(
+        &self,
+        n: usize,
+        start_timestamp: &str,
+        end_timestamp: &str,
+    ) -> Result<Vec<Vec<FileSlice>>> {
+        let file_slices = 
self.get_file_slices_between_internal(start_timestamp, end_timestamp).await?;
+        if file_slices.is_empty() {
+            return Ok(Vec::new());
+        }
+
+        let n = std::cmp::max(1, n);
+        let chunk_size = file_slices.len().div_ceil(n);
+
+        Ok(file_slices
+            .chunks(chunk_size)
+            .map(|chunk| chunk.to_vec())
+            .collect())

Review Comment:
   +1



##########
crates/core/src/table/mod.rs:
##########
@@ -520,6 +520,72 @@ impl Table {
             })
     }
 
+    /// Get all the changed [FileSlice]s in splits from the table between the 
given timestamps.
+    ///
+    /// # Arguments
+    ///     * `n` - The number of chunks to split the file slices into.
+    ///     * `start_timestamp` - If provided, only file slices that were 
changed after this timestamp will be returned.
+    ///     * `end_timestamp` - If provided, only file slices that were 
changed before or at this timestamp will be returned.
+    ///
+    /// # Notes
+    ///     * This API is useful for implementing incremental query with read 
parallelism.
+    ///     * Uses the same splitting flow as the time-travel API to respect 
read parallelism config.
+    pub async fn get_file_slices_splits_between(
+        &self,
+        n: usize,
+        start_timestamp: Option<&str>,
+        end_timestamp: Option<&str>,
+    ) -> Result<Vec<Vec<FileSlice>>> {
+        // If the end timestamp is not provided, use the latest commit 
timestamp.
+        let Some(end) =
+            end_timestamp.or_else(|| 
self.timeline.get_latest_commit_timestamp_as_option())
+        else {
+            // No latest commit timestamp means the table is empty.
+            return Ok(Vec::new());
+        };
+
+        let start = start_timestamp.unwrap_or(EARLIEST_START_TIMESTAMP);
+
+        self.get_file_slices_splits_between_internal(n, start, end)
+            .await
+    }
+
+    /// Same as [Table::get_file_slices_splits_between], but blocking.
+    pub fn get_file_slices_splits_between_blocking(
+        &self,
+        n: usize,
+        start_timestamp: Option<&str>,
+        end_timestamp: Option<&str>,
+    ) -> Result<Vec<Vec<FileSlice>>> {

Review Comment:
   Returning `Vec<Vec<FileSlice>>` matches existing APIs but clones. This is 
fine now and consistent; if you need to optimize later, a slice-based variant 
could be introduced without breaking the current API. We need a util that takes 
`Vec<FileSlice>` and returns `Vec<Vec<FileSlice>>` by moving, not cloning.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to