alamb commented on code in PR #17293: URL: https://github.com/apache/datafusion/pull/17293#discussion_r2294421433
########## datafusion/datasource-parquet/src/opener.rs: ########## @@ -409,64 +411,103 @@ impl FileOpener for ParquetOpener { .with_row_groups(row_group_indexes) .build()?; - // Create a stateful stream that can check pruning after each batch - let adapted = { - use futures::stream; - let schema_mapping = Some(schema_mapping); - let file_pruner = file_pruner; - let stream = stream.map_err(|e| ArrowError::ExternalError(Box::new(e))); - let files_ranges_pruned_statistics = - file_metrics.files_ranges_pruned_statistics.clone(); - - stream::try_unfold( - ( - stream, - schema_mapping, - file_pruner, - files_ranges_pruned_statistics, - ), - move |( - mut stream, - schema_mapping_opt, - mut file_pruner, - files_ranges_pruned_statistics, - )| async move { - match stream.try_next().await? { - Some(batch) => { - let schema_mapping = schema_mapping_opt.as_ref().unwrap(); - let mapped_batch = schema_mapping.map_batch(batch)?; - - // Check if we can prune the file now - if let Some(ref mut pruner) = file_pruner { - if pruner.should_prune()? { - // File can now be pruned based on updated dynamic filters - files_ranges_pruned_statistics.add(1); - // Terminate the stream early - return Ok(None); - } - } - - Ok(Some(( - mapped_batch, - ( - stream, - schema_mapping_opt, - file_pruner, - files_ranges_pruned_statistics, - ), - ))) - } - None => Ok(None), - } - }, - ) - }; + let stream = stream + .map_err(|e| ArrowError::ExternalError(Box::new(e))) + .map(move |b| b.and_then(|b| Ok(schema_mapping.map_batch(b)?))); - Ok(adapted.boxed()) + if let Some(file_pruner) = file_pruner { + Ok(EarlyStoppingStream::new( + stream, + file_pruner, + file_metrics.files_ranges_pruned_statistics.clone(), + ) + .boxed()) + } else { + Ok(stream.boxed()) Review Comment: I think this is also a bit easier to understand that the pruning only happens when we have an file pruner -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org