This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new aa5bd8e  fix: resolve env vars for creating standalone file group 
reader (#345)
aa5bd8e is described below

commit aa5bd8e2565a09a7702678a6983542652b151ff0
Author: Shiyan Xu <[email protected]>
AuthorDate: Mon Jun 2 17:22:00 2025 -0500

    fix: resolve env vars for creating standalone file group reader (#345)
---
 crates/core/src/file_group/reader.rs               | 117 ++++++++++++++++-----
 crates/core/src/table/builder.rs                   |  43 +++++---
 crates/core/src/table/mod.rs                       |   2 +-
 .../.hoodie/hoodie.properties                      |  26 +++++
 demo/apps/hudi-file-group-api/cpp/main.cpp         |  11 +-
 python/src/internal.rs                             |   5 +-
 python/tests/test_file_group_read.py               |  39 +++++++
 7 files changed, 194 insertions(+), 49 deletions(-)

diff --git a/crates/core/src/file_group/reader.rs 
b/crates/core/src/file_group/reader.rs
index 5d79ce8..d94c9d1 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -27,6 +27,7 @@ use crate::file_group::log_file::scanner::LogFileScanner;
 use crate::merge::record_merger::RecordMerger;
 use crate::metadata::meta_field::MetaField;
 use crate::storage::Storage;
+use crate::table::builder::OptionResolver;
 use crate::timeline::selector::InstantRange;
 use crate::Result;
 use arrow::compute::and;
@@ -44,16 +45,20 @@ pub struct FileGroupReader {
 }
 
 impl FileGroupReader {
-    pub(crate) fn new_with_configs_and_options<I, K, V>(
+    /// Creates a new reader with the given Hudi configurations and 
overwriting options.
+    ///
+    /// # Notes
+    /// This API does **not** use [`OptionResolver`] that loads table 
properties from storage to resolve options.
+    pub(crate) fn new_with_configs_and_overwriting_options<I, K, V>(
         hudi_configs: Arc<HudiConfigs>,
-        options: I,
+        overwriting_options: I,
     ) -> Result<Self>
     where
         I: IntoIterator<Item = (K, V)>,
         K: AsRef<str>,
         V: Into<String>,
     {
-        let (hudi_opts, others) = split_hudi_options_from_others(options);
+        let (hudi_opts, others) = 
split_hudi_options_from_others(overwriting_options);
 
         let mut final_opts = hudi_configs.as_options();
         final_opts.extend(hudi_opts);
@@ -71,18 +76,30 @@ impl FileGroupReader {
     /// # Arguments
     ///     * `base_uri` - The base URI of the file group's residing table.
     ///     * `options` - Additional options for the reader.
+    ///
+    /// # Notes
+    /// This API uses [`OptionResolver`] that loads table properties from 
storage to resolve options.
     pub fn new_with_options<I, K, V>(base_uri: &str, options: I) -> 
Result<Self>
     where
         I: IntoIterator<Item = (K, V)>,
         K: AsRef<str>,
         V: Into<String>,
     {
-        let hudi_configs = Arc::new(HudiConfigs::new([(
-            HudiTableConfig::BasePath.as_ref().to_string(),
-            base_uri.to_string(),
-        )]));
-
-        Self::new_with_configs_and_options(hudi_configs, options)
+        tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()?
+            .block_on(async {
+                let mut resolver = OptionResolver::new_with_options(base_uri, 
options);
+                resolver.resolve_options().await?;
+                let hudi_configs = 
Arc::new(HudiConfigs::new(resolver.hudi_options));
+                let storage =
+                    Storage::new(Arc::new(resolver.storage_options), 
hudi_configs.clone())?;
+
+                Ok(Self {
+                    hudi_configs,
+                    storage,
+                })
+            })
     }
 
     fn create_filtering_mask_for_base_file_records(
@@ -251,30 +268,79 @@ mod tests {
     use super::*;
     use crate::config::util::empty_options;
     use crate::error::CoreError;
+    use crate::Result;
     use arrow::array::{ArrayRef, Int64Array, StringArray};
     use arrow::record_batch::RecordBatch;
     use arrow_schema::{DataType, Field, Schema};
+    use std::fs::canonicalize;
+    use std::path::PathBuf;
     use std::sync::Arc;
+    use url::Url;
+
+    fn get_non_existent_base_uri() -> String {
+        "file:///non-existent-path/table".to_string()
+    }
+
+    fn get_base_uri_with_valid_props() -> String {
+        let url = Url::from_file_path(
+            canonicalize(
+                PathBuf::from("tests")
+                    .join("data")
+                    .join("table_props_valid"),
+            )
+            .unwrap(),
+        )
+        .unwrap();
+        url.as_ref().to_string()
+    }
+
+    fn get_base_uri_with_valid_props_minimum() -> String {
+        let url = Url::from_file_path(
+            canonicalize(
+                PathBuf::from("tests")
+                    .join("data")
+                    .join("table_props_valid_minimum"),
+            )
+            .unwrap(),
+        )
+        .unwrap();
+        url.as_ref().to_string()
+    }
+
+    fn get_base_uri_with_invalid_props() -> String {
+        let url = Url::from_file_path(
+            canonicalize(
+                PathBuf::from("tests")
+                    .join("data")
+                    .join("table_props_invalid"),
+            )
+            .unwrap(),
+        )
+        .unwrap();
+        url.as_ref().to_string()
+    }
 
     #[test]
-    fn test_new_with_options() -> Result<()> {
+    fn test_new_with_options() {
         let options = vec![("key1", "value1"), ("key2", "value2")];
-        let reader = FileGroupReader::new_with_options("/tmp/hudi_data", 
options)?;
+        let base_uri = get_base_uri_with_valid_props();
+        let reader = FileGroupReader::new_with_options(&base_uri, 
options).unwrap();
         assert!(!reader.storage.options.is_empty());
         assert!(reader
             .storage
             .hudi_configs
             .contains(HudiTableConfig::BasePath));
-        Ok(())
     }
 
     #[test]
-    fn test_read_file_slice_returns_error() {
-        let reader =
-            
FileGroupReader::new_with_options("file:///non-existent-path/table", 
empty_options())
-                .unwrap();
-        let result = 
reader.read_file_slice_by_base_file_path_blocking("non_existent_file");
-        assert!(matches!(result.unwrap_err(), ReadFileSliceError(_)));
+    fn test_new_with_options_invalid_base_uri_or_invalid_props() {
+        let base_uri = get_non_existent_base_uri();
+        let result = FileGroupReader::new_with_options(&base_uri, 
empty_options());
+        assert!(result.is_err());
+
+        let base_uri = get_base_uri_with_invalid_props();
+        let result = FileGroupReader::new_with_options(&base_uri, 
empty_options());
+        assert!(result.is_err())
     }
 
     fn create_test_record_batch() -> Result<RecordBatch> {
@@ -296,11 +362,12 @@ mod tests {
 
     #[test]
     fn test_create_filtering_mask_for_base_file_records() -> Result<()> {
-        let base_uri = "file:///non-existent-path/table";
+        let base_uri = get_base_uri_with_valid_props_minimum();
         let records = create_test_record_batch()?;
-        // Test case 1: No meta fields populated
+
+        // Test case 1: Disable populating the meta fields
         let reader = FileGroupReader::new_with_options(
-            base_uri,
+            &base_uri,
             [
                 (HudiTableConfig::PopulatesMetaFields.as_ref(), "false"),
                 (HudiReadConfig::FileGroupStartTimestamp.as_ref(), "2"),
@@ -310,13 +377,13 @@ mod tests {
         assert_eq!(mask, None, "Commit time filtering should not be needed");
 
         // Test case 2: No commit time filtering options
-        let reader = FileGroupReader::new_with_options(base_uri, 
empty_options())?;
+        let reader = FileGroupReader::new_with_options(&base_uri, 
empty_options())?;
         let mask = 
reader.create_filtering_mask_for_base_file_records(&records)?;
         assert_eq!(mask, None);
 
         // Test case 3: Filtering commit time > '2'
         let reader = FileGroupReader::new_with_options(
-            base_uri,
+            &base_uri,
             [(HudiReadConfig::FileGroupStartTimestamp, "2")],
         )?;
         let mask = 
reader.create_filtering_mask_for_base_file_records(&records)?;
@@ -328,7 +395,7 @@ mod tests {
 
         // Test case 4: Filtering commit time <= '4'
         let reader = FileGroupReader::new_with_options(
-            base_uri,
+            &base_uri,
             [(HudiReadConfig::FileGroupEndTimestamp, "4")],
         )?;
         let mask = 
reader.create_filtering_mask_for_base_file_records(&records)?;
@@ -336,7 +403,7 @@ mod tests {
 
         // Test case 5: Filtering commit time > '2' and <= '4'
         let reader = FileGroupReader::new_with_options(
-            base_uri,
+            &base_uri,
             [
                 (HudiReadConfig::FileGroupStartTimestamp, "2"),
                 (HudiReadConfig::FileGroupEndTimestamp, "4"),
diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs
index ebb72e2..d123126 100644
--- a/crates/core/src/table/builder.rs
+++ b/crates/core/src/table/builder.rs
@@ -43,10 +43,10 @@ pub struct TableBuilder {
 /// Resolver for options including Hudi options, storage options, and generic 
options.
 #[derive(Debug, Clone)]
 pub struct OptionResolver {
-    base_uri: String,
-    hudi_options: HashMap<String, String>,
-    storage_options: HashMap<String, String>,
-    options: HashMap<String, String>,
+    pub base_uri: String,
+    pub hudi_options: HashMap<String, String>,
+    pub storage_options: HashMap<String, String>,
+    pub options: HashMap<String, String>,
 }
 
 macro_rules! impl_with_options {
@@ -106,11 +106,7 @@ impl TableBuilder {
 
         option_resolver.resolve_options().await?;
 
-        let hudi_configs = 
HudiConfigs::new(self.option_resolver.hudi_options.iter());
-
-        validate_configs(&hudi_configs).expect("Hudi configs are not valid.");
-
-        let hudi_configs = Arc::from(hudi_configs);
+        let hudi_configs = 
Arc::from(HudiConfigs::new(option_resolver.hudi_options.iter()));
 
         let storage_options = 
Arc::from(self.option_resolver.storage_options.clone());
 
@@ -140,6 +136,25 @@ impl OptionResolver {
         }
     }
 
+    /// Create a new [OptionResolver] with the given base URI and options.
+    pub fn new_with_options<I, K, V>(base_uri: &str, options: I) -> Self
+    where
+        I: IntoIterator<Item = (K, V)>,
+        K: AsRef<str>,
+        V: Into<String>,
+    {
+        let options = options
+            .into_iter()
+            .map(|(k, v)| (k.as_ref().to_string(), v.into()))
+            .collect();
+        Self {
+            base_uri: base_uri.to_string(),
+            hudi_options: HashMap::new(),
+            storage_options: HashMap::new(),
+            options,
+        }
+    }
+
     /// Resolve all options by combining the ones from hoodie.properties, 
user-provided options,
     /// env vars, and global Hudi configs. The precedence order is as follows:
     ///
@@ -150,16 +165,20 @@ impl OptionResolver {
     /// 5. Global Hudi configs
     ///
     /// [note] Error may occur when 1 and 2 have conflicts.
-    async fn resolve_options(&mut self) -> Result<()> {
+    pub async fn resolve_options(&mut self) -> Result<()> {
         self.resolve_user_provided_options();
 
-        // if any user-provided options are intended for cloud storage and in 
uppercase,
+        // If any user-provided options are intended for cloud storage and in 
uppercase,
         // convert them to lowercase. This is to allow `object_store` to pick 
them up.
         self.resolve_cloud_env_vars();
 
         // At this point, we have resolved the storage options needed for 
accessing the storage layer.
         // We can now resolve the hudi options
-        self.resolve_hudi_options().await
+        self.resolve_hudi_options().await?;
+
+        // Validate the resolved Hudi options
+        let hudi_configs = HudiConfigs::new(self.hudi_options.iter());
+        validate_configs(&hudi_configs)
     }
 
     fn resolve_user_provided_options(&mut self) {
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 7cb72da..ad1c89b 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -541,7 +541,7 @@ impl Table {
         for (k, v) in options {
             overwriting_options.insert(k.as_ref().to_string(), v.into());
         }
-        FileGroupReader::new_with_configs_and_options(
+        FileGroupReader::new_with_configs_and_overwriting_options(
             self.hudi_configs.clone(),
             overwriting_options,
         )
diff --git 
a/crates/core/tests/data/table_props_valid_minimum/.hoodie/hoodie.properties 
b/crates/core/tests/data/table_props_valid_minimum/.hoodie/hoodie.properties
new file mode 100644
index 0000000..2c2fdbf
--- /dev/null
+++ b/crates/core/tests/data/table_props_valid_minimum/.hoodie/hoodie.properties
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+hoodie.table.type=copy_on_write
+hoodie.table.checksum=3761586722
+hoodie.table.recordkey.fields=uuid
+hoodie.table.name=trips
+hoodie.table.version=6
+hoodie.table.timeline.timezone=local
+hoodie.timeline.layout.version=1
diff --git a/demo/apps/hudi-file-group-api/cpp/main.cpp 
b/demo/apps/hudi-file-group-api/cpp/main.cpp
index ff58b6b..568725e 100644
--- a/demo/apps/hudi-file-group-api/cpp/main.cpp
+++ b/demo/apps/hudi-file-group-api/cpp/main.cpp
@@ -138,16 +138,7 @@ int main() {
 
         auto base_uri = "s3://hudi-demo/cow/v6_nonpartitioned";
 
-        const char* aws_access_key_id = std::getenv("AWS_ACCESS_KEY_ID");
-        const char* aws_secret_access_key = 
std::getenv("AWS_SECRET_ACCESS_KEY");
-        
-        std::vector<std::string> opts{
-            "aws_access_key_id=" + std::string(aws_access_key_id),
-            "aws_secret_access_key=" + std::string(aws_secret_access_key),
-            "aws_endpoint_url=http://minio:9000";,
-            "aws_allow_http=true",
-            "aws_region=us-east-1"
-        };
+        std::vector<std::string> opts{};
         auto file_group_reader = new_file_group_reader_with_options(base_uri, 
opts);
 
         auto base_file_path = 
"a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet";
diff --git a/python/src/internal.rs b/python/src/internal.rs
index aac3658..fab4395 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -69,7 +69,10 @@ pub struct HudiFileGroupReader {
 impl HudiFileGroupReader {
     #[new]
     #[pyo3(signature = (base_uri, options=None))]
-    fn new(base_uri: &str, options: Option<HashMap<String, String>>) -> 
PyResult<Self> {
+    fn new_with_options(
+        base_uri: &str,
+        options: Option<HashMap<String, String>>,
+    ) -> PyResult<Self> {
         let inner = FileGroupReader::new_with_options(base_uri, 
options.unwrap_or_default())
             .map_err(PythonError::from)?;
         Ok(HudiFileGroupReader { inner })
diff --git a/python/tests/test_file_group_read.py 
b/python/tests/test_file_group_read.py
new file mode 100644
index 0000000..d68d645
--- /dev/null
+++ b/python/tests/test_file_group_read.py
@@ -0,0 +1,39 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+import pyarrow as pa
+
+from hudi import HudiFileGroupReader
+
+
+def test_file_group_api_read_file_slice(get_sample_table):
+    table_path = get_sample_table
+    file_group_reader = HudiFileGroupReader(table_path)
+
+    batch = file_group_reader.read_file_slice_by_base_file_path(
+        
"san_francisco/780b8586-3ad0-48ef-a6a1-d2217845ce4a-0_0-8-0_20240402123035233.parquet"
+    )
+
+    t = pa.Table.from_batches([batch]).select([0, 5, 6, 9]).sort_by("ts")
+    assert t.to_pylist() == [
+        {
+            "_hoodie_commit_time": "20240402123035233",
+            "ts": 1695159649087,
+            "uuid": "334e26e9-8355-45cc-97c6-c31daf0df330",
+            "fare": 19.1,
+        },
+    ]

Reply via email to