This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 1f0232c995 Reattach parquet metadata cache after deserializing in
datafusion-proto (#20574)
1f0232c995 is described below
commit 1f0232c995111e2a9b8d58228ca604c804d0a879
Author: nathan <[email protected]>
AuthorDate: Thu Mar 5 02:10:34 2026 -0500
Reattach parquet metadata cache after deserializing in datafusion-proto
(#20574)
- Addressing: https://github.com/apache/datafusion/issues/20575
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/proto/src/physical_plan/mod.rs | 20 ++++++--
.../proto/tests/cases/roundtrip_physical_plan.rs | 58 +++++++++++++++++++++-
2 files changed, 73 insertions(+), 5 deletions(-)
diff --git a/datafusion/proto/src/physical_plan/mod.rs
b/datafusion/proto/src/physical_plan/mod.rs
index bfba715b91..fce8ac658e 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -42,9 +42,13 @@ use datafusion_datasource_csv::source::CsvSource;
use datafusion_datasource_json::file_format::JsonSink;
use datafusion_datasource_json::source::JsonSource;
#[cfg(feature = "parquet")]
+use datafusion_datasource_parquet::CachedParquetFileReaderFactory;
+#[cfg(feature = "parquet")]
use datafusion_datasource_parquet::file_format::ParquetSink;
#[cfg(feature = "parquet")]
use datafusion_datasource_parquet::source::ParquetSource;
+#[cfg(feature = "parquet")]
+use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::{FunctionRegistry, TaskContext};
use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
use datafusion_functions_table::generate_series::{
@@ -847,9 +851,19 @@ impl protobuf::PhysicalPlanNode {
// Parse table schema with partition columns
let table_schema = parse_table_schema_from_proto(base_conf)?;
-
- let mut source =
-
ParquetSource::new(table_schema).with_table_parquet_options(options);
+ let object_store_url = match base_conf.object_store_url.is_empty()
{
+ false => ObjectStoreUrl::parse(&base_conf.object_store_url)?,
+ true => ObjectStoreUrl::local_filesystem(),
+ };
+ let store = ctx.runtime_env().object_store(object_store_url)?;
+ let metadata_cache =
+ ctx.runtime_env().cache_manager.get_file_metadata_cache();
+ let reader_factory =
+ Arc::new(CachedParquetFileReaderFactory::new(store,
metadata_cache));
+
+ let mut source = ParquetSource::new(table_schema)
+ .with_parquet_file_reader_factory(reader_factory)
+ .with_table_parquet_options(options);
if let Some(predicate) = predicate {
source = source.with_predicate(predicate);
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 230727c8c1..ccee240b94 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -36,8 +36,9 @@ use datafusion::datasource::listing::{
};
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{
- ArrowSource, FileGroup, FileOutputMode, FileScanConfigBuilder,
FileSinkConfig,
- ParquetSource, wrap_partition_type_in_dict, wrap_partition_value_in_dict,
+ ArrowSource, FileGroup, FileOutputMode, FileScanConfig,
FileScanConfigBuilder,
+ FileSinkConfig, ParquetSource, wrap_partition_type_in_dict,
+ wrap_partition_value_in_dict,
};
use datafusion::datasource::sink::DataSinkExec;
use datafusion::datasource::source::DataSourceExec;
@@ -929,6 +930,59 @@ fn roundtrip_parquet_exec_with_pruning_predicate() ->
Result<()> {
roundtrip_test(DataSourceExec::from_data_source(scan_config))
}
+#[test]
+fn roundtrip_parquet_exec_attaches_cached_reader_factory_after_roundtrip() ->
Result<()> {
+ let file_schema =
+ Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
+ let file_source = Arc::new(ParquetSource::new(Arc::clone(&file_schema)));
+ let scan_config =
+ FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(),
file_source)
+ .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
+ "/path/to/file.parquet".to_string(),
+ 1024,
+ )])])
+ .with_statistics(Statistics {
+ num_rows: Precision::Inexact(100),
+ total_byte_size: Precision::Inexact(1024),
+ column_statistics: Statistics::unknown_column(&file_schema),
+ })
+ .build();
+ let exec_plan = DataSourceExec::from_data_source(scan_config);
+
+ let ctx = SessionContext::new();
+ let codec = DefaultPhysicalExtensionCodec {};
+ let proto_converter = DefaultPhysicalProtoConverter {};
+ let roundtripped =
+ roundtrip_test_and_return(exec_plan, &ctx, &codec, &proto_converter)?;
+
+ let data_source = roundtripped
+ .as_any()
+ .downcast_ref::<DataSourceExec>()
+ .ok_or_else(|| {
+ internal_datafusion_err!("Expected DataSourceExec after roundtrip")
+ })?;
+ let file_scan = data_source
+ .data_source()
+ .as_any()
+ .downcast_ref::<FileScanConfig>()
+ .ok_or_else(|| {
+ internal_datafusion_err!("Expected FileScanConfig after roundtrip")
+ })?;
+ let parquet_source = file_scan
+ .file_source()
+ .as_any()
+ .downcast_ref::<ParquetSource>()
+ .ok_or_else(|| {
+ internal_datafusion_err!("Expected ParquetSource after roundtrip")
+ })?;
+
+ assert!(
+ parquet_source.parquet_file_reader_factory().is_some(),
+ "Parquet reader factory should be attached after decoding from
protobuf"
+ );
+ Ok(())
+}
+
#[test]
fn roundtrip_arrow_scan() -> Result<()> {
let file_schema =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]