yunchipang commented on code in PR #411:
URL: https://github.com/apache/hudi-rs/pull/411#discussion_r2289852381


##########
crates/core/src/table/mod.rs:
##########
@@ -520,6 +520,72 @@ impl Table {
             })
     }
 
+    /// Get all the changed [FileSlice]s in splits from the table between the 
given timestamps.
+    ///
+    /// # Arguments
+    ///     * `n` - The number of chunks to split the file slices into.
+    ///     * `start_timestamp` - If provided, only file slices that were 
changed after this timestamp will be returned.
+    ///     * `end_timestamp` - If provided, only file slices that were 
changed before or at this timestamp will be returned.
+    ///
+    /// # Notes
+    ///     * This API is useful for implementing incremental query with read 
parallelism.
+    ///     * Uses the same splitting flow as the time-travel API to respect 
read parallelism config.
+    pub async fn get_file_slices_splits_between(
+        &self,
+        n: usize,
+        start_timestamp: Option<&str>,
+        end_timestamp: Option<&str>,
+    ) -> Result<Vec<Vec<FileSlice>>> {
+        // If the end timestamp is not provided, use the latest commit 
timestamp.
+        let Some(end) =
+            end_timestamp.or_else(|| 
self.timeline.get_latest_commit_timestamp_as_option())
+        else {
+            // No latest commit timestamp means the table is empty.
+            return Ok(Vec::new());
+        };
+
+        let start = start_timestamp.unwrap_or(EARLIEST_START_TIMESTAMP);
+
+        self.get_file_slices_splits_between_internal(n, start, end)
+            .await
+    }
+
+    /// Same as [Table::get_file_slices_splits_between], but blocking.
+    pub fn get_file_slices_splits_between_blocking(
+        &self,
+        n: usize,
+        start_timestamp: Option<&str>,
+        end_timestamp: Option<&str>,
+    ) -> Result<Vec<Vec<FileSlice>>> {
+        tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()?
+            .block_on(async {
+                self.get_file_slices_splits_between(n, start_timestamp, 
end_timestamp)
+                    .await
+            })
+    }
+
+    async fn get_file_slices_splits_between_internal(
+        &self,
+        n: usize,
+        start_timestamp: &str,
+        end_timestamp: &str,
+    ) -> Result<Vec<Vec<FileSlice>>> {
+        let file_slices = 
self.get_file_slices_between_internal(start_timestamp, end_timestamp).await?;
+        if file_slices.is_empty() {
+            return Ok(Vec::new());
+        }
+
+        let n = std::cmp::max(1, n);
+        let chunk_size = file_slices.len().div_ceil(n);
+
+        Ok(file_slices
+            .chunks(chunk_size)
+            .map(|chunk| chunk.to_vec())
+            .collect())

Review Comment:
   thanks for the suggestion. moved!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to