This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new b2c60b1  refactor: add API stubs for performing incremental queries 
(#220)
b2c60b1 is described below

commit b2c60b13c92bde8e433455633f08815cb1af8a0b
Author: Shiyan Xu <[email protected]>
AuthorDate: Fri Dec 6 21:51:22 2024 -1000

    refactor: add API stubs for performing incremental queries (#220)
---
 crates/core/src/table/builder.rs | 11 +++++++-
 crates/core/src/table/mod.rs     | 58 ++++++++++++++++++++++++----------------
 2 files changed, 45 insertions(+), 24 deletions(-)

diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs
index 20df88e..4ca2997 100644
--- a/crates/core/src/table/builder.rs
+++ b/crates/core/src/table/builder.rs
@@ -28,7 +28,9 @@ use strum::IntoEnumIterator;
 
 use crate::config::internal::HudiInternalConfig::SkipConfigValidation;
 use crate::config::read::HudiReadConfig;
-use crate::config::table::HudiTableConfig::{DropsPartitionFields, TableType, 
TableVersion};
+use crate::config::table::HudiTableConfig::{
+    DropsPartitionFields, TableType, TableVersion, TimelineLayoutVersion,
+};
 use crate::config::table::TableTypeValue::CopyOnWrite;
 use crate::config::table::{HudiTableConfig, TableTypeValue};
 use crate::config::util::{parse_data_for_options, 
split_hudi_options_from_others};
@@ -265,6 +267,13 @@ impl TableBuilder {
             ));
         }
 
+        let timeline_layout_version = 
hudi_configs.get(TimelineLayoutVersion)?.to::<isize>();
+        if timeline_layout_version != 1 {
+            return Err(CoreError::Unsupported(
+                "Only support timeline layout version 1.".to_string(),
+            ));
+        }
+
         let drops_partition_cols = hudi_configs
             .get_or_default(DropsPartitionFields)
             .to::<bool>();
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index da32a10..9e7a495 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -243,6 +243,10 @@ impl Table {
             .await
     }
 
+    pub fn create_file_group_reader(&self) -> FileGroupReader {
+        FileGroupReader::new(self.file_system_view.storage.clone())
+    }
+
     /// Get all the latest records in the table.
     ///
     /// If the [AsOfTimestamp] configuration is set, the records at the 
specified timestamp will be returned.
@@ -271,20 +275,22 @@ impl Table {
         Ok(batches)
     }
 
-    #[cfg(test)]
-    async fn get_file_paths_with_filters(
+    /// Get records that were inserted or updated between the given 
timestamps. Records that were updated multiple times should have their latest 
states within the time span being returned.
+    pub async fn read_incremental_records(
         &self,
-        filters: &[(&str, &str, &str)],
-    ) -> Result<Vec<String>> {
-        let mut file_paths = Vec::new();
-        for f in self.get_file_slices(filters).await? {
-            file_paths.push(f.base_file_path().to_string());
-        }
-        Ok(file_paths)
+        _start_timestamp: &str,
+        _end_timestamp: Option<&str>,
+    ) -> Result<Vec<RecordBatch>> {
+        todo!("read_incremental_states")
     }
 
-    pub fn create_file_group_reader(&self) -> FileGroupReader {
-        FileGroupReader::new(self.file_system_view.storage.clone())
+    /// Get the change-data-capture (CDC) records between the given 
timestamps. The CDC records should reflect the records that were inserted, 
updated, and deleted between the timestamps.
+    pub async fn read_incremental_changes(
+        &self,
+        _start_timestamp: &str,
+        _end_timestamp: Option<&str>,
+    ) -> Result<Vec<RecordBatch>> {
+        todo!("read_incremental_changes")
     }
 }
 
@@ -326,6 +332,18 @@ mod tests {
         .unwrap()
     }
 
+    /// Test helper to get relative file paths from the table with filters.
+    async fn get_file_paths_with_filters(
+        table: &Table,
+        filters: &[(&str, &str, &str)],
+    ) -> Result<Vec<String>> {
+        let mut file_paths = Vec::new();
+        for f in table.get_file_slices(filters).await? {
+            file_paths.push(f.base_file_path().to_string());
+        }
+        Ok(file_paths)
+    }
+
     #[tokio::test]
     async fn test_hudi_table_get_hudi_options() {
         let base_url = TestTable::V6Nonpartitioned.url();
@@ -700,8 +718,7 @@ mod tests {
         assert_eq!(hudi_table.timeline.instants.len(), 2);
 
         let partition_filters = &[];
-        let actual = hudi_table
-            .get_file_paths_with_filters(partition_filters)
+        let actual = get_file_paths_with_filters(&hudi_table, 
partition_filters)
             .await
             .unwrap()
             .into_iter()
@@ -717,8 +734,7 @@ mod tests {
         assert_eq!(actual, expected);
 
         let partition_filters = &[("byteField", ">=", "10"), ("byteField", 
"<", "30")];
-        let actual = hudi_table
-            .get_file_paths_with_filters(partition_filters)
+        let actual = get_file_paths_with_filters(&hudi_table, 
partition_filters)
             .await
             .unwrap()
             .into_iter()
@@ -733,8 +749,7 @@ mod tests {
         assert_eq!(actual, expected);
 
         let partition_filters = &[("byteField", ">", "30")];
-        let actual = hudi_table
-            .get_file_paths_with_filters(partition_filters)
+        let actual = get_file_paths_with_filters(&hudi_table, 
partition_filters)
             .await
             .unwrap()
             .into_iter()
@@ -750,8 +765,7 @@ mod tests {
         assert_eq!(hudi_table.timeline.instants.len(), 2);
 
         let partition_filters = &[];
-        let actual = hudi_table
-            .get_file_paths_with_filters(partition_filters)
+        let actual = get_file_paths_with_filters(&hudi_table, 
partition_filters)
             .await
             .unwrap()
             .into_iter()
@@ -771,8 +785,7 @@ mod tests {
             ("byteField", "<", "20"),
             ("shortField", "!=", "100"),
         ];
-        let actual = hudi_table
-            .get_file_paths_with_filters(partition_filters)
+        let actual = get_file_paths_with_filters(&hudi_table, 
partition_filters)
             .await
             .unwrap()
             .into_iter()
@@ -786,8 +799,7 @@ mod tests {
         assert_eq!(actual, expected);
 
         let partition_filters = &[("byteField", ">", "20"), ("shortField", 
"=", "300")];
-        let actual = hudi_table
-            .get_file_paths_with_filters(partition_filters)
+        let actual = get_file_paths_with_filters(&hudi_table, 
partition_filters)
             .await
             .unwrap()
             .into_iter()

Reply via email to