This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new d16ce67 build(deps): upgrade datafusion & arrow, and restrict deps
upgrade to patch-level (#386)
d16ce67 is described below
commit d16ce6770a49fead91fe21814962951a4c34d706
Author: Shiyan Xu <[email protected]>
AuthorDate: Sat Jun 28 23:52:21 2025 -0500
build(deps): upgrade datafusion & arrow, and restrict deps upgrade to
patch-level (#386)
DataFusion 45.0 -> 46.0
Arrow 54.1 -> 54.2
Fix DataFusion API usage based on the new version. Update test cases to
cover filter pushdown.
---
Cargo.toml | 70 ++++++++++++++++----------------
crates/core/src/avro_to_arrow/README.md | 4 +-
crates/core/src/table/partition.rs | 6 +--
crates/datafusion/src/lib.rs | 34 +++++++++-------
crates/test/Cargo.toml | 4 +-
demo/apps/datafusion/Cargo.toml | 4 +-
demo/apps/hudi-table-api/rust/Cargo.toml | 4 +-
python/Cargo.toml | 2 +-
8 files changed, 66 insertions(+), 62 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 2e0f38b..2291648 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -36,50 +36,50 @@ repository = "https://github.com/apache/hudi-rs"
[workspace.dependencies]
# arrow
-arrow = { version = "~54.1.0", features = ["pyarrow"] }
-arrow-arith = { version = "~54.1.0" }
-arrow-array = { version = "~54.1.0" }
-arrow-buffer = { version = "~54.1.0" }
-arrow-cast = { version = "~54.1.0" }
-arrow-ipc = { version = "~54.1.0" }
-arrow-json = { version = "~54.1.0" }
-arrow-ord = { version = "~54.1.0" }
-arrow-row = { version = "~54.1.0" }
-arrow-schema = { version = "~54.1.0", features = ["serde"] }
-arrow-select = { version = "~54.1.0" }
+arrow = { version = "~54.2.0"}
+arrow-arith = { version = "~54.2.0" }
+arrow-array = { version = "~54.2.0" }
+arrow-buffer = { version = "~54.2.0" }
+arrow-cast = { version = "~54.2.0" }
+arrow-ipc = { version = "~54.2.0" }
+arrow-json = { version = "~54.2.0" }
+arrow-ord = { version = "~54.2.0" }
+arrow-row = { version = "~54.2.0" }
+arrow-schema = { version = "~54.2.0", features = ["serde"] }
+arrow-select = { version = "~54.2.0" }
object_store = { version = "~0.11.2", features = ["aws", "azure", "gcp"] }
-parquet = { version = "~54.1.0", features = ["async", "object_store"] }
+parquet = { version = "~54.2.0", features = ["async", "object_store"] }
# avro
-apache-avro = { version = "0.17.0" }
+apache-avro = { version = "~0.17.0" }
# datafusion
-datafusion = { version = "~45.0.0" }
-datafusion-expr = { version = "~45.0.0" }
-datafusion-common = { version = "~45.0.0" }
-datafusion-physical-expr = { version = "~45.0.0" }
+datafusion = { version = "~46.0.0" }
+datafusion-expr = { version = "~46.0.0" }
+datafusion-common = { version = "~46.0.0" }
+datafusion-physical-expr = { version = "~46.0.0" }
# serde
-percent-encoding = { version = "2.3.1" }
-serde = { version = "1.0", features = ["derive"] }
-serde_json = { version = "1.0" }
+percent-encoding = { version = "~2.3.1" }
+serde = { version = "~1.0", features = ["derive"] }
+serde_json = { version = "~1.0" }
# "stdlib"
-thiserror = { version = "2.0.11" }
-bytes = { version = "1" }
+thiserror = { version = "~2.0.11" }
+bytes = { version = "~1.10" }
chrono = { version = "=0.4.39" }
-lazy_static = { version = "1.5.0" }
-log = { version = "0.4" }
-num-traits = { version = "0.2" }
-once_cell = { version = "1.21.3" }
-paste = { version = "1.0.15" }
-strum = { version = "0.27.0", features = ["derive"] }
-strum_macros = "0.27.0"
-url = { version = "2.5" }
+lazy_static = { version = "~1.5.0" }
+log = { version = "~0.4" }
+num-traits = { version = "~0.2" }
+once_cell = { version = "~1.21.3" }
+paste = { version = "~1.0.15" }
+strum = { version = "~0.27.0", features = ["derive"] }
+strum_macros = "~0.27.0"
+url = { version = "~2.5.4" }
# runtime / async
-async-recursion = { version = "1.1.1" }
-async-trait = { version = "0.1" }
-dashmap = { version = "6.1" }
-futures = { version = "0.3" }
-tokio = { version = "1", features = ["rt-multi-thread"] }
+async-recursion = { version = "~1.1.1" }
+async-trait = { version = "~0.1" }
+dashmap = { version = "~6.1" }
+futures = { version = "~0.3" }
+tokio = { version = "~1.45", features = ["rt-multi-thread"] }
diff --git a/crates/core/src/avro_to_arrow/README.md
b/crates/core/src/avro_to_arrow/README.md
index 2155382..7449041 100644
--- a/crates/core/src/avro_to_arrow/README.md
+++ b/crates/core/src/avro_to_arrow/README.md
@@ -19,7 +19,7 @@
> [!NOTE]
> This module is taken
-> from [Apache
DataFusion](https://github.com/apache/datafusion/tree/45.0.0/datafusion/core/src/datasource/avro_to_arrow)
+> from [Apache
DataFusion](https://github.com/apache/datafusion/tree/46.0.1/datafusion/core/src/datasource/avro_to_arrow)
> and modified to work with Hudi Avro log block. The original code is licensed
> under the Apache License, Version 2.0.
## Notable Changes
@@ -27,4 +27,4 @@
- Removed `reader.rs`.
- Original tests are removed.
- DataFusion errors are replaced with Hudi errors.
-- Adjust `AvroArrowArrayReader` to work with Iterator of Avro Values.
\ No newline at end of file
+- Adjust `AvroArrowArrayReader` to work with Iterator of Avro Values.
diff --git a/crates/core/src/table/partition.rs
b/crates/core/src/table/partition.rs
index 307b428..0378955 100644
--- a/crates/core/src/table/partition.rs
+++ b/crates/core/src/table/partition.rs
@@ -63,10 +63,10 @@ impl PartitionPruner {
partition_schema: &Schema,
hudi_configs: &HudiConfigs,
) -> Result<Self> {
- let and_filters = and_filters
+ let and_filters: Vec<SchemableFilter> = and_filters
.iter()
- .map(|filter| SchemableFilter::try_from((filter.clone(),
partition_schema)))
- .collect::<Result<Vec<SchemableFilter>>>()?;
+ .filter_map(|filter| SchemableFilter::try_from((filter.clone(),
partition_schema)).ok())
+ .collect();
let schema = Arc::new(partition_schema.clone());
let is_hive_style: bool = hudi_configs
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index b22a30d..60778a1 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -30,8 +30,9 @@ use async_trait::async_trait;
use datafusion::catalog::{Session, TableProviderFactory};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
-use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder;
+use datafusion::datasource::physical_plan::parquet::source::ParquetSource;
use datafusion::datasource::physical_plan::FileScanConfig;
+use datafusion::datasource::source::DataSourceExec;
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::logical_expr::Operator;
@@ -202,26 +203,29 @@ impl TableProvider for HudiDataSource {
let base_url = self.table.base_url();
let url = ObjectStoreUrl::parse(get_scheme_authority(&base_url))?;
- let fsc = FileScanConfig::new(url, self.schema())
- .with_file_groups(parquet_file_groups)
- .with_projection(projection.cloned())
- .with_limit(limit);
let parquet_opts = TableParquetOptions {
global: state.config_options().execution.parquet.clone(),
column_specific_options: Default::default(),
key_value_metadata: Default::default(),
};
- let mut exec_builder = ParquetExecBuilder::new_with_options(fsc,
parquet_opts);
-
+ let table_schema = self.schema();
+ let mut parquet_source = ParquetSource::new(parquet_opts);
let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
if let Some(expr) = filter {
- let df_schema = DFSchema::try_from(self.schema())?;
+ let df_schema = DFSchema::try_from(table_schema.clone())?;
let predicate = create_physical_expr(&expr, &df_schema,
state.execution_props())?;
- exec_builder = exec_builder.with_predicate(predicate)
+ parquet_source =
parquet_source.with_predicate(table_schema.clone(), predicate)
}
- Ok(exec_builder.build_arc())
+ let fsc = Arc::new(
+ FileScanConfig::new(url, table_schema, Arc::new(parquet_source))
+ .with_file_groups(parquet_file_groups)
+ .with_projection(projection.cloned())
+ .with_limit(limit),
+ );
+
+ Ok(Arc::new(DataSourceExec::new(fsc)))
}
fn supports_filters_pushdown(
@@ -467,7 +471,7 @@ mod tests {
table_name
)));
assert!(plan_lines[4].starts_with(
- "FilterExec: CAST(id@0 AS Int64) % 2 = 0 AND
get_field(structField@3, field2) > 30"
+ "FilterExec: CAST(id@0 AS Int64) % 2 = 0 AND name@1 != Alice AND
get_field(structField@3, field2) > 30"
));
assert!(plan_lines[5].contains(&format!("input_partitions={}",
planned_input_partitioned)));
}
@@ -486,7 +490,7 @@ mod tests {
}
#[tokio::test]
- async fn test_datafusion_read_hudi_table() {
+ async fn test_datafusion_read_hudi_table_with_partition_filter_pushdown() {
for (test_table, use_sql, planned_input_partitions) in &[
(V6ComplexkeygenHivestyle, true, 2),
(V6Nonpartitioned, true, 1),
@@ -503,7 +507,7 @@ mod tests {
let sql = format!(
r#"
SELECT id, name, isActive, structField.field2
- FROM {} WHERE id % 2 = 0
+ FROM {} WHERE id % 2 = 0 AND name != 'Alice'
AND structField.field2 > 30 ORDER BY name LIMIT 10"#,
test_table.as_ref()
);
@@ -527,7 +531,7 @@ mod tests {
}
#[tokio::test]
- async fn test_datafusion_read_hudi_table_with_replacecommits() {
+ async fn
test_datafusion_read_hudi_table_with_replacecommits_with_partition_filter_pushdown()
{
for (test_table, use_sql, planned_input_partitions) in
&[(V6SimplekeygenNonhivestyleOverwritetable, true, 1)]
{
@@ -540,7 +544,7 @@ mod tests {
let sql = format!(
r#"
SELECT id, name, isActive, structField.field2
- FROM {} WHERE id % 2 = 0
+ FROM {} WHERE id % 2 = 0 AND name != 'Alice'
AND structField.field2 > 30 ORDER BY name LIMIT 10"#,
test_table.as_ref()
);
diff --git a/crates/test/Cargo.toml b/crates/test/Cargo.toml
index 1562097..1f129d3 100644
--- a/crates/test/Cargo.toml
+++ b/crates/test/Cargo.toml
@@ -38,5 +38,5 @@ strum_macros = { workspace = true }
url = { workspace = true }
# testing
-tempfile = "3"
-zip-extract = "0.3"
+tempfile = "3.20.0"
+zip-extract = "0.3.0"
diff --git a/demo/apps/datafusion/Cargo.toml b/demo/apps/datafusion/Cargo.toml
index 4938de9..3bd0064 100644
--- a/demo/apps/datafusion/Cargo.toml
+++ b/demo/apps/datafusion/Cargo.toml
@@ -24,6 +24,6 @@ version = "0.1.0"
edition = "2021"
[dependencies]
-tokio = "^1"
-datafusion = "~45.0.0"
+tokio = "~1.45"
+datafusion = "~46.0.0"
hudi = { path = "../../../crates/hudi", features = ["datafusion"] }
diff --git a/demo/apps/hudi-table-api/rust/Cargo.toml
b/demo/apps/hudi-table-api/rust/Cargo.toml
index 39139a2..d3164d7 100644
--- a/demo/apps/hudi-table-api/rust/Cargo.toml
+++ b/demo/apps/hudi-table-api/rust/Cargo.toml
@@ -24,7 +24,7 @@ version = "0.1.0"
edition = "2021"
[dependencies]
-tokio = "^1"
-arrow = { version = "~54.1.0", features = ["pyarrow"] }
+tokio = "~1.45"
+arrow = { version = "~54.2.0" }
hudi = { path = "../../../../crates/hudi" }
diff --git a/python/Cargo.toml b/python/Cargo.toml
index c67147f..3914b96 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -35,7 +35,7 @@ doc = false
[dependencies]
hudi = { path = "../crates/hudi" }
# arrow
-arrow = { workspace = true }
+arrow = { workspace = true, features = ["pyarrow"] }
# "stdlib"
thiserror = { workspace = true }