alamb commented on code in PR #20481:
URL: https://github.com/apache/datafusion/pull/20481#discussion_r2849888314
##########
datafusion-examples/examples/custom_data_source/custom_datasource.rs:
##########
@@ -62,8 +62,9 @@ async fn search_accounts(
expected_result_length: usize,
) -> Result<()> {
// create local execution context
- let ctx = SessionContext::new();
-
+ let config = SessionConfig::new()
Review Comment:
this seems a strange thing to set in an example -- maybe we could just
update the results or whatever in the example or add a comment to explain how
it works
##########
datafusion-examples/examples/custom_data_source/csv_json_opener.rs:
##########
@@ -80,8 +80,13 @@ async fn csv_opener() -> Result<()> {
.create_file_opener(object_store, &scan_config, 0)?;
let mut result = vec![];
- let mut stream =
- FileStream::new(&scan_config, 0, opener,
&ExecutionPlanMetricsSet::new())?;
+ let mut stream = FileStream::new(
+ &scan_config,
+ 0,
+ opener,
+ &ExecutionPlanMetricsSet::new(),
+ None,
+ )?;
Review Comment:
We might be able to avoid the API changes by making more of a builder style
API
```rust
let mut stream = FileStream::new(&scan_config, 0, opener,
&ExecutionPlanMetricsSet::new())?
.with_shared_queue(None);
```
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -180,7 +195,207 @@ impl PreparedAccessPlan {
}
}
+impl ParquetOpener {
+ fn build_row_group_access_filter(
+ file_name: &str,
+ extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
+ row_group_count: usize,
+ row_group_metadata: &[RowGroupMetaData],
+ file_range: Option<&FileRange>,
+ stats_pruning: Option<RowGroupStatisticsPruningContext<'_>>,
+ ) -> Result<RowGroupAccessPlanFilter> {
+ let mut row_groups = RowGroupAccessPlanFilter::new(create_initial_plan(
+ file_name,
+ extensions,
+ row_group_count,
+ )?);
+
+ if let Some(range) = file_range {
+ row_groups.prune_by_range(row_group_metadata, range);
+ }
+
+ if let Some(stats_pruning) = stats_pruning {
+ row_groups.prune_by_statistics(
+ stats_pruning.physical_file_schema.as_ref(),
+ stats_pruning.parquet_schema,
+ row_group_metadata,
+ stats_pruning.predicate,
+ stats_pruning.file_metrics,
+ );
+ }
+
+ Ok(row_groups)
+ }
+}
+
impl FileOpener for ParquetOpener {
+ fn morselize(
+ &self,
+ partitioned_file: PartitionedFile,
+ ) -> BoxFuture<'static, Result<Vec<PartitionedFile>>> {
+ if partitioned_file
+ .extensions
+ .as_ref()
+ .map(|e| e.is::<ParquetMorsel>())
+ .unwrap_or(false)
+ {
+ return Box::pin(ready(Ok(vec![partitioned_file])));
+ }
+
+ let file_metrics = ParquetFileMetrics::new(
+ self.partition_index,
+ partitioned_file.object_meta.location.as_ref(),
+ &self.metrics,
+ );
+ let file_name = partitioned_file.object_meta.location.to_string();
+ let file_range = partitioned_file.range.clone();
+ let extensions = partitioned_file.extensions.clone();
+
+ let metadata_size_hint = partitioned_file
+ .metadata_size_hint
+ .or(self.metadata_size_hint);
+
+ let mut async_file_reader: Box<dyn AsyncFileReader> =
+ match self.parquet_file_reader_factory.create_reader(
+ self.partition_index,
+ partitioned_file.clone(),
+ metadata_size_hint,
+ &self.metrics,
+ ) {
+ Ok(reader) => reader,
+ Err(e) => return Box::pin(ready(Err(e))),
+ };
+
+ let options = ArrowReaderOptions::new().with_page_index(false);
+ #[cfg(feature = "parquet_encryption")]
+ let encryption_context = self.get_encryption_context();
+
+ let expr_adapter_factory = Arc::clone(&self.expr_adapter_factory);
+ let table_schema = self.table_schema.clone();
+ let predicate = self.predicate.clone();
+ let metrics = self.metrics.clone();
+ let enable_row_group_stats_pruning =
self.enable_row_group_stats_pruning;
+ let limit = self.limit;
+ let preserve_order = self.preserve_order;
+
+ Box::pin(async move {
+ #[cfg(feature = "parquet_encryption")]
+ let options = if let Some(fd_val) = encryption_context
+
.get_file_decryption_properties(&partitioned_file.object_meta.location)
+ .await?
+ {
+ options.with_file_decryption_properties(Arc::clone(&fd_val))
+ } else {
+ options
+ };
+
+ let predicate_creation_errors = MetricBuilder::new(&metrics)
+ .global_counter("num_predicate_creation_errors");
+
+ // Step: try to prune the file using file-level statistics before
loading
+ // parquet metadata. This avoids the I/O cost of reading metadata
when
+ // file-level stats (available from the catalog) indicate no rows
can match.
+ if let Some(pred) = predicate.as_ref() {
+ let logical_file_schema =
Arc::clone(table_schema.file_schema());
+ if let Some(mut file_pruner) = FilePruner::try_new(
+ Arc::clone(pred),
+ &logical_file_schema,
+ &partitioned_file,
+ predicate_creation_errors.clone(),
+ ) && file_pruner.should_prune()?
+ {
+ file_metrics.files_ranges_pruned_statistics.add_pruned(1);
+ return Ok(vec![]);
+ }
+ }
+
+ let mut _metadata_timer = file_metrics.metadata_load_time.timer();
+ let reader_metadata =
+ ArrowReaderMetadata::load_async(&mut async_file_reader,
options).await?;
+ let metadata = reader_metadata.metadata();
+ let num_row_groups = metadata.num_row_groups();
+
+ // Adapt the physical schema to the file schema for pruning
+ let physical_file_schema = Arc::clone(reader_metadata.schema());
+ let logical_file_schema = table_schema.file_schema();
+ let rewriter = expr_adapter_factory.create(
+ Arc::clone(logical_file_schema),
+ Arc::clone(&physical_file_schema),
+ )?;
+ let simplifier =
PhysicalExprSimplifier::new(&physical_file_schema);
+
+ // Replace partition column references with their literal values
before rewriting.
Review Comment:
we should probabbly factor this into some common areas
##########
datafusion-examples/examples/data_io/json_shredding.rs:
##########
@@ -93,6 +93,7 @@ pub async fn json_shredding() -> Result<()> {
// Set up query execution
let mut cfg = SessionConfig::new();
cfg.options_mut().execution.parquet.pushdown_filters = true;
Review Comment:
I personally think we should update the example to not need this setting --
this can be a follow on PR
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -180,7 +195,207 @@ impl PreparedAccessPlan {
}
}
+impl ParquetOpener {
+ fn build_row_group_access_filter(
+ file_name: &str,
+ extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
+ row_group_count: usize,
+ row_group_metadata: &[RowGroupMetaData],
+ file_range: Option<&FileRange>,
+ stats_pruning: Option<RowGroupStatisticsPruningContext<'_>>,
+ ) -> Result<RowGroupAccessPlanFilter> {
+ let mut row_groups = RowGroupAccessPlanFilter::new(create_initial_plan(
+ file_name,
+ extensions,
+ row_group_count,
+ )?);
+
+ if let Some(range) = file_range {
+ row_groups.prune_by_range(row_group_metadata, range);
+ }
+
+ if let Some(stats_pruning) = stats_pruning {
+ row_groups.prune_by_statistics(
+ stats_pruning.physical_file_schema.as_ref(),
+ stats_pruning.parquet_schema,
+ row_group_metadata,
+ stats_pruning.predicate,
+ stats_pruning.file_metrics,
+ );
+ }
+
+ Ok(row_groups)
+ }
+}
+
impl FileOpener for ParquetOpener {
+ fn morselize(
Review Comment:
I think it would help here / somewhere to document the design / intent of
`morselize` as comments -- we can probably start from the PR description. I can
help with this
##########
datafusion/core/src/datasource/physical_plan/parquet.rs:
##########
@@ -2459,4 +2462,152 @@ mod tests {
assert_eq!(calls.len(), 2);
assert_eq!(calls, vec![Some(123), Some(456)]);
}
+
+ #[tokio::test]
+ async fn parquet_morsel_driven_execution() -> Result<()> {
+ let store =
+ Arc::new(object_store::memory::InMemory::new()) as Arc<dyn
ObjectStore>;
+ let store_url = ObjectStoreUrl::parse("memory://test").unwrap();
+
+ let ctx = SessionContext::new();
+ ctx.register_object_store(store_url.as_ref(), store.clone());
+
+ // Create a Parquet file with 100 row groups, each with 10 rows
Review Comment:
It would help me grok/understand this test better if the mechanics were
refactored out:
1. The setup were in a separate function (e.g. `fn input_file()->Vec<u8>` or
something
2. The code that drove a partition to completion was in its own function
3.
##########
datafusion/core/src/datasource/physical_plan/parquet.rs:
##########
@@ -2459,4 +2462,152 @@ mod tests {
assert_eq!(calls.len(), 2);
assert_eq!(calls, vec![Some(123), Some(456)]);
}
+
+ #[tokio::test]
+ async fn parquet_morsel_driven_execution() -> Result<()> {
+ let store =
+ Arc::new(object_store::memory::InMemory::new()) as Arc<dyn
ObjectStore>;
+ let store_url = ObjectStoreUrl::parse("memory://test").unwrap();
+
+ let ctx = SessionContext::new();
+ ctx.register_object_store(store_url.as_ref(), store.clone());
+
+ // Create a Parquet file with 100 row groups, each with 10 rows
+ let schema = Arc::new(Schema::new(vec![Field::new("a",
DataType::Int32, false)]));
+
+ let mut out = Vec::new();
+ let props = WriterProperties::builder()
+ .set_max_row_group_size(10)
+ .build();
+ {
+ let mut writer =
+ ArrowWriter::try_new(&mut out, Arc::clone(&schema),
Some(props))?;
+ // Write many batches to ensure they are not coalesced and we can
verify work distribution
+ for i in 0..100 {
+ let batch = RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![Arc::new(Int32Array::from(vec![i; 10]))],
+ )?;
+ writer.write(&batch)?;
+ }
+ writer.close()?;
+ }
+
+ let path = Path::from("skewed.parquet");
+ store.put(&path, out.into()).await?;
+ let meta = store.head(&path).await?;
+
+ // Set up DataSourceExec with 2 partitions, but the file is only in
partition 0 (skewed)
+ let source = Arc::new(ParquetSource::new(schema));
+ let config = FileScanConfigBuilder::new(store_url, source)
+
.with_file_group(FileGroup::new(vec![PartitionedFile::new_from_meta(meta)]))
+ .with_file_group(FileGroup::new(vec![])) // Partition 1 is empty
+ .with_morsel_driven(true)
+ .build();
+
+ let exec = DataSourceExec::from_data_source(config);
+
+ // Execute both partitions concurrently
Review Comment:
FWIW I think
[`tokio::test`](https://docs.rs/tokio/latest/tokio/attr.test.html) only uses a
single thread --
You need to do something like
```rust
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
```
To make it truly multi-threaded (not sure if that is important for this test)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]