This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new aa5bd8e fix: resolve env vars for creating standalone file group
reader (#345)
aa5bd8e is described below
commit aa5bd8e2565a09a7702678a6983542652b151ff0
Author: Shiyan Xu <[email protected]>
AuthorDate: Mon Jun 2 17:22:00 2025 -0500
fix: resolve env vars for creating standalone file group reader (#345)
---
crates/core/src/file_group/reader.rs | 117 ++++++++++++++++-----
crates/core/src/table/builder.rs | 43 +++++---
crates/core/src/table/mod.rs | 2 +-
.../.hoodie/hoodie.properties | 26 +++++
demo/apps/hudi-file-group-api/cpp/main.cpp | 11 +-
python/src/internal.rs | 5 +-
python/tests/test_file_group_read.py | 39 +++++++
7 files changed, 194 insertions(+), 49 deletions(-)
diff --git a/crates/core/src/file_group/reader.rs
b/crates/core/src/file_group/reader.rs
index 5d79ce8..d94c9d1 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -27,6 +27,7 @@ use crate::file_group::log_file::scanner::LogFileScanner;
use crate::merge::record_merger::RecordMerger;
use crate::metadata::meta_field::MetaField;
use crate::storage::Storage;
+use crate::table::builder::OptionResolver;
use crate::timeline::selector::InstantRange;
use crate::Result;
use arrow::compute::and;
@@ -44,16 +45,20 @@ pub struct FileGroupReader {
}
impl FileGroupReader {
- pub(crate) fn new_with_configs_and_options<I, K, V>(
+ /// Creates a new reader with the given Hudi configurations and
overwriting options.
+ ///
+ /// # Notes
+ /// This API does **not** use [`OptionResolver`] that loads table
properties from storage to resolve options.
+ pub(crate) fn new_with_configs_and_overwriting_options<I, K, V>(
hudi_configs: Arc<HudiConfigs>,
- options: I,
+ overwriting_options: I,
) -> Result<Self>
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<str>,
V: Into<String>,
{
- let (hudi_opts, others) = split_hudi_options_from_others(options);
+ let (hudi_opts, others) =
split_hudi_options_from_others(overwriting_options);
let mut final_opts = hudi_configs.as_options();
final_opts.extend(hudi_opts);
@@ -71,18 +76,30 @@ impl FileGroupReader {
/// # Arguments
/// * `base_uri` - The base URI of the file group's residing table.
/// * `options` - Additional options for the reader.
+ ///
+ /// # Notes
+ /// This API uses [`OptionResolver`] that loads table properties from
storage to resolve options.
pub fn new_with_options<I, K, V>(base_uri: &str, options: I) ->
Result<Self>
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<str>,
V: Into<String>,
{
- let hudi_configs = Arc::new(HudiConfigs::new([(
- HudiTableConfig::BasePath.as_ref().to_string(),
- base_uri.to_string(),
- )]));
-
- Self::new_with_configs_and_options(hudi_configs, options)
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()?
+ .block_on(async {
+ let mut resolver = OptionResolver::new_with_options(base_uri,
options);
+ resolver.resolve_options().await?;
+ let hudi_configs =
Arc::new(HudiConfigs::new(resolver.hudi_options));
+ let storage =
+ Storage::new(Arc::new(resolver.storage_options),
hudi_configs.clone())?;
+
+ Ok(Self {
+ hudi_configs,
+ storage,
+ })
+ })
}
fn create_filtering_mask_for_base_file_records(
@@ -251,30 +268,79 @@ mod tests {
use super::*;
use crate::config::util::empty_options;
use crate::error::CoreError;
+ use crate::Result;
use arrow::array::{ArrayRef, Int64Array, StringArray};
use arrow::record_batch::RecordBatch;
use arrow_schema::{DataType, Field, Schema};
+ use std::fs::canonicalize;
+ use std::path::PathBuf;
use std::sync::Arc;
+ use url::Url;
+
+ fn get_non_existent_base_uri() -> String {
+ "file:///non-existent-path/table".to_string()
+ }
+
+ fn get_base_uri_with_valid_props() -> String {
+ let url = Url::from_file_path(
+ canonicalize(
+ PathBuf::from("tests")
+ .join("data")
+ .join("table_props_valid"),
+ )
+ .unwrap(),
+ )
+ .unwrap();
+ url.as_ref().to_string()
+ }
+
+ fn get_base_uri_with_valid_props_minimum() -> String {
+ let url = Url::from_file_path(
+ canonicalize(
+ PathBuf::from("tests")
+ .join("data")
+ .join("table_props_valid_minimum"),
+ )
+ .unwrap(),
+ )
+ .unwrap();
+ url.as_ref().to_string()
+ }
+
+ fn get_base_uri_with_invalid_props() -> String {
+ let url = Url::from_file_path(
+ canonicalize(
+ PathBuf::from("tests")
+ .join("data")
+ .join("table_props_invalid"),
+ )
+ .unwrap(),
+ )
+ .unwrap();
+ url.as_ref().to_string()
+ }
#[test]
- fn test_new_with_options() -> Result<()> {
+ fn test_new_with_options() {
let options = vec![("key1", "value1"), ("key2", "value2")];
- let reader = FileGroupReader::new_with_options("/tmp/hudi_data",
options)?;
+ let base_uri = get_base_uri_with_valid_props();
+ let reader = FileGroupReader::new_with_options(&base_uri,
options).unwrap();
assert!(!reader.storage.options.is_empty());
assert!(reader
.storage
.hudi_configs
.contains(HudiTableConfig::BasePath));
- Ok(())
}
#[test]
- fn test_read_file_slice_returns_error() {
- let reader =
-
FileGroupReader::new_with_options("file:///non-existent-path/table",
empty_options())
- .unwrap();
- let result =
reader.read_file_slice_by_base_file_path_blocking("non_existent_file");
- assert!(matches!(result.unwrap_err(), ReadFileSliceError(_)));
+ fn test_new_with_options_invalid_base_uri_or_invalid_props() {
+ let base_uri = get_non_existent_base_uri();
+ let result = FileGroupReader::new_with_options(&base_uri,
empty_options());
+ assert!(result.is_err());
+
+ let base_uri = get_base_uri_with_invalid_props();
+ let result = FileGroupReader::new_with_options(&base_uri,
empty_options());
+ assert!(result.is_err())
}
fn create_test_record_batch() -> Result<RecordBatch> {
@@ -296,11 +362,12 @@ mod tests {
#[test]
fn test_create_filtering_mask_for_base_file_records() -> Result<()> {
- let base_uri = "file:///non-existent-path/table";
+ let base_uri = get_base_uri_with_valid_props_minimum();
let records = create_test_record_batch()?;
- // Test case 1: No meta fields populated
+
+ // Test case 1: Disable populating the meta fields
let reader = FileGroupReader::new_with_options(
- base_uri,
+ &base_uri,
[
(HudiTableConfig::PopulatesMetaFields.as_ref(), "false"),
(HudiReadConfig::FileGroupStartTimestamp.as_ref(), "2"),
@@ -310,13 +377,13 @@ mod tests {
assert_eq!(mask, None, "Commit time filtering should not be needed");
// Test case 2: No commit time filtering options
- let reader = FileGroupReader::new_with_options(base_uri,
empty_options())?;
+ let reader = FileGroupReader::new_with_options(&base_uri,
empty_options())?;
let mask =
reader.create_filtering_mask_for_base_file_records(&records)?;
assert_eq!(mask, None);
// Test case 3: Filtering commit time > '2'
let reader = FileGroupReader::new_with_options(
- base_uri,
+ &base_uri,
[(HudiReadConfig::FileGroupStartTimestamp, "2")],
)?;
let mask =
reader.create_filtering_mask_for_base_file_records(&records)?;
@@ -328,7 +395,7 @@ mod tests {
// Test case 4: Filtering commit time <= '4'
let reader = FileGroupReader::new_with_options(
- base_uri,
+ &base_uri,
[(HudiReadConfig::FileGroupEndTimestamp, "4")],
)?;
let mask =
reader.create_filtering_mask_for_base_file_records(&records)?;
@@ -336,7 +403,7 @@ mod tests {
// Test case 5: Filtering commit time > '2' and <= '4'
let reader = FileGroupReader::new_with_options(
- base_uri,
+ &base_uri,
[
(HudiReadConfig::FileGroupStartTimestamp, "2"),
(HudiReadConfig::FileGroupEndTimestamp, "4"),
diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs
index ebb72e2..d123126 100644
--- a/crates/core/src/table/builder.rs
+++ b/crates/core/src/table/builder.rs
@@ -43,10 +43,10 @@ pub struct TableBuilder {
/// Resolver for options including Hudi options, storage options, and generic
options.
#[derive(Debug, Clone)]
pub struct OptionResolver {
- base_uri: String,
- hudi_options: HashMap<String, String>,
- storage_options: HashMap<String, String>,
- options: HashMap<String, String>,
+ pub base_uri: String,
+ pub hudi_options: HashMap<String, String>,
+ pub storage_options: HashMap<String, String>,
+ pub options: HashMap<String, String>,
}
macro_rules! impl_with_options {
@@ -106,11 +106,7 @@ impl TableBuilder {
option_resolver.resolve_options().await?;
- let hudi_configs =
HudiConfigs::new(self.option_resolver.hudi_options.iter());
-
- validate_configs(&hudi_configs).expect("Hudi configs are not valid.");
-
- let hudi_configs = Arc::from(hudi_configs);
+ let hudi_configs =
Arc::from(HudiConfigs::new(option_resolver.hudi_options.iter()));
let storage_options =
Arc::from(self.option_resolver.storage_options.clone());
@@ -140,6 +136,25 @@ impl OptionResolver {
}
}
+ /// Create a new [OptionResolver] with the given base URI and options.
+ pub fn new_with_options<I, K, V>(base_uri: &str, options: I) -> Self
+ where
+ I: IntoIterator<Item = (K, V)>,
+ K: AsRef<str>,
+ V: Into<String>,
+ {
+ let options = options
+ .into_iter()
+ .map(|(k, v)| (k.as_ref().to_string(), v.into()))
+ .collect();
+ Self {
+ base_uri: base_uri.to_string(),
+ hudi_options: HashMap::new(),
+ storage_options: HashMap::new(),
+ options,
+ }
+ }
+
/// Resolve all options by combining the ones from hoodie.properties,
user-provided options,
/// env vars, and global Hudi configs. The precedence order is as follows:
///
@@ -150,16 +165,20 @@ impl OptionResolver {
/// 5. Global Hudi configs
///
/// [note] Error may occur when 1 and 2 have conflicts.
- async fn resolve_options(&mut self) -> Result<()> {
+ pub async fn resolve_options(&mut self) -> Result<()> {
self.resolve_user_provided_options();
- // if any user-provided options are intended for cloud storage and in
uppercase,
+ // If any user-provided options are intended for cloud storage and in
uppercase,
// convert them to lowercase. This is to allow `object_store` to pick
them up.
self.resolve_cloud_env_vars();
// At this point, we have resolved the storage options needed for
accessing the storage layer.
// We can now resolve the hudi options
- self.resolve_hudi_options().await
+ self.resolve_hudi_options().await?;
+
+ // Validate the resolved Hudi options
+ let hudi_configs = HudiConfigs::new(self.hudi_options.iter());
+ validate_configs(&hudi_configs)
}
fn resolve_user_provided_options(&mut self) {
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 7cb72da..ad1c89b 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -541,7 +541,7 @@ impl Table {
for (k, v) in options {
overwriting_options.insert(k.as_ref().to_string(), v.into());
}
- FileGroupReader::new_with_configs_and_options(
+ FileGroupReader::new_with_configs_and_overwriting_options(
self.hudi_configs.clone(),
overwriting_options,
)
diff --git
a/crates/core/tests/data/table_props_valid_minimum/.hoodie/hoodie.properties
b/crates/core/tests/data/table_props_valid_minimum/.hoodie/hoodie.properties
new file mode 100644
index 0000000..2c2fdbf
--- /dev/null
+++ b/crates/core/tests/data/table_props_valid_minimum/.hoodie/hoodie.properties
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+hoodie.table.type=copy_on_write
+hoodie.table.checksum=3761586722
+hoodie.table.recordkey.fields=uuid
+hoodie.table.name=trips
+hoodie.table.version=6
+hoodie.table.timeline.timezone=local
+hoodie.timeline.layout.version=1
diff --git a/demo/apps/hudi-file-group-api/cpp/main.cpp
b/demo/apps/hudi-file-group-api/cpp/main.cpp
index ff58b6b..568725e 100644
--- a/demo/apps/hudi-file-group-api/cpp/main.cpp
+++ b/demo/apps/hudi-file-group-api/cpp/main.cpp
@@ -138,16 +138,7 @@ int main() {
auto base_uri = "s3://hudi-demo/cow/v6_nonpartitioned";
- const char* aws_access_key_id = std::getenv("AWS_ACCESS_KEY_ID");
- const char* aws_secret_access_key =
std::getenv("AWS_SECRET_ACCESS_KEY");
-
- std::vector<std::string> opts{
- "aws_access_key_id=" + std::string(aws_access_key_id),
- "aws_secret_access_key=" + std::string(aws_secret_access_key),
- "aws_endpoint_url=http://minio:9000",
- "aws_allow_http=true",
- "aws_region=us-east-1"
- };
+ std::vector<std::string> opts{};
auto file_group_reader = new_file_group_reader_with_options(base_uri,
opts);
auto base_file_path =
"a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet";
diff --git a/python/src/internal.rs b/python/src/internal.rs
index aac3658..fab4395 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -69,7 +69,10 @@ pub struct HudiFileGroupReader {
impl HudiFileGroupReader {
#[new]
#[pyo3(signature = (base_uri, options=None))]
- fn new(base_uri: &str, options: Option<HashMap<String, String>>) ->
PyResult<Self> {
+ fn new_with_options(
+ base_uri: &str,
+ options: Option<HashMap<String, String>>,
+ ) -> PyResult<Self> {
let inner = FileGroupReader::new_with_options(base_uri,
options.unwrap_or_default())
.map_err(PythonError::from)?;
Ok(HudiFileGroupReader { inner })
diff --git a/python/tests/test_file_group_read.py
b/python/tests/test_file_group_read.py
new file mode 100644
index 0000000..d68d645
--- /dev/null
+++ b/python/tests/test_file_group_read.py
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pyarrow as pa
+
+from hudi import HudiFileGroupReader
+
+
+def test_file_group_api_read_file_slice(get_sample_table):
+ table_path = get_sample_table
+ file_group_reader = HudiFileGroupReader(table_path)
+
+ batch = file_group_reader.read_file_slice_by_base_file_path(
+
"san_francisco/780b8586-3ad0-48ef-a6a1-d2217845ce4a-0_0-8-0_20240402123035233.parquet"
+ )
+
+ t = pa.Table.from_batches([batch]).select([0, 5, 6, 9]).sort_by("ts")
+ assert t.to_pylist() == [
+ {
+ "_hoodie_commit_time": "20240402123035233",
+ "ts": 1695159649087,
+ "uuid": "334e26e9-8355-45cc-97c6-c31daf0df330",
+ "fare": 19.1,
+ },
+ ]