alamb commented on code in PR #20481:
URL: https://github.com/apache/datafusion/pull/20481#discussion_r2852747458
##########
datafusion/datasource/src/file_stream.rs:
##########
@@ -113,15 +151,123 @@ impl FileStream {
FileStreamState::Idle => {
self.file_stream_metrics.time_opening.start();
- match self.start_next_file().transpose() {
- Ok(Some(future)) => self.state = FileStreamState::Open
{ future },
- Ok(None) => return Poll::Ready(None),
+ if self.morsel_driven {
Review Comment:
I think it would help make this code easier to understand and test if this
control loop was encapsulated rather than inlined (maybe a method on WorkQueue?)
##########
datafusion/datasource/src/file_stream.rs:
##########
@@ -43,10 +45,25 @@ use futures::future::BoxFuture;
use futures::stream::BoxStream;
use futures::{FutureExt as _, Stream, StreamExt as _, ready};
+/// A guard that decrements the morselizing count when dropped.
+struct MorselizingGuard {
+ queue: Arc<WorkQueue>,
+}
+
+impl Drop for MorselizingGuard {
+ fn drop(&mut self) {
+ self.queue.stop_morselizing();
+ }
+}
+
/// A stream that iterates record batch by record batch, file over file.
pub struct FileStream {
/// An iterator over input files.
file_iter: VecDeque<PartitionedFile>,
+ /// Shared work queue for morsel-driven execution.
Review Comment:
While reading this code, it looks to me like the key idea is to break a
stream of PartitionedFiles into another stream of Morsels
I was thinking instead of having two different control flow paths, one for
Morsels and one for PartitionedFiles) it might be cleaner to add another layer
of abstraction that takes a stream of `PartitionedFile`s and produces a stream
of `ParquetAccessPlan`
When "morselizing" a single PartitionFile could (potentially) produce
multiple `ParquetAccessPlan`s
When not "morselizing" then a single PartitionedFile would produce one
ParquetAccess plan, as today
I think this is pretty close to what WorkQueue does now except the control
flow is different in the two cases.
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -180,7 +195,293 @@ impl PreparedAccessPlan {
}
}
+impl ParquetOpener {
+ fn build_row_group_access_filter(
+ file_name: &str,
+ extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
+ row_group_count: usize,
+ row_group_metadata: &[RowGroupMetaData],
+ file_range: Option<&FileRange>,
+ stats_pruning: Option<RowGroupStatisticsPruningContext<'_>>,
+ ) -> Result<RowGroupAccessPlanFilter> {
+ let mut row_groups = RowGroupAccessPlanFilter::new(create_initial_plan(
+ file_name,
+ extensions,
+ row_group_count,
+ )?);
+
+ if let Some(range) = file_range {
+ row_groups.prune_by_range(row_group_metadata, range);
+ }
+
+ if let Some(stats_pruning) = stats_pruning {
+ row_groups.prune_by_statistics(
+ stats_pruning.physical_file_schema.as_ref(),
+ stats_pruning.parquet_schema,
+ row_group_metadata,
+ stats_pruning.predicate,
+ stats_pruning.file_metrics,
+ );
+ }
+
+ Ok(row_groups)
+ }
+}
+
impl FileOpener for ParquetOpener {
+ fn is_leaf_morsel(&self, file: &PartitionedFile) -> bool {
+ file.extensions
+ .as_ref()
+ .map(|e| e.is::<ParquetMorsel>())
+ .unwrap_or(false)
+ }
+
+ fn morselize(
+ &self,
+ partitioned_file: PartitionedFile,
+ ) -> BoxFuture<'static, Result<Vec<PartitionedFile>>> {
+ if partitioned_file
+ .extensions
+ .as_ref()
+ .map(|e| e.is::<ParquetMorsel>())
+ .unwrap_or(false)
+ {
+ return Box::pin(ready(Ok(vec![partitioned_file])));
Review Comment:
I found this mechanism a little confusing -- it seems to me like it could
override the ParquetAccessPlan that a user provides π€
I have some ideas on how to make the mapping of `PartitionedFile` -->
`ParquetAccessPlan` more explicit, which I think then would avoid having to
save a ParquetMorsel in the extensions
##########
datafusion/datasource/src/file_stream.rs:
##########
@@ -130,9 +130,16 @@ impl FileStream {
///
/// Since file opening is mostly IO (and may involve a
/// bunch of sequential IO), it can be parallelized with decoding.
+ ///
+ /// In morsel-driven mode this prefetches the next already-morselized item
+ /// from the shared queue (leaf morsels only β items that still need
+ /// async morselization are left in the queue for the normal Idle β
+ /// Morselizing path).
fn start_next_file(&mut self) -> Option<Result<FileOpenFuture>> {
if self.morsel_driven {
- return None;
+ let queue = Arc::clone(self.shared_queue.as_ref()?);
Review Comment:
It is probably too much for this PR, but I think it would be great if we
could move the code towards having a more explicit pipeline of operations. I
think this could make the code simpler and set is up to have potentially add
deeper prefetching and better tune CPU and IO:
For example, it seems to me we have the following steps:
```
ββββββββββββββββββββββ ββββββββββββββββββββββ
βββββββββββββββββββββββββ ββββββββββββββββββββββ
β Open File β β Plan β β Fetch Data
β β β
β (fetch / decode βββββββΆβ(Apply statistics + βββββΆβ(fetch Row Group
data) ββββΆβ Decode Data β
β ParquetMetadata) β β pruning) β β
β β β
ββββββββββββββββββββββ ββββββββββββββββββββββ
βββββββββββββββββββββββββ ββββββββββββββββββββββ
IO+CPU CPU IO
CPU
```
I wonder if we can model the various tasks as async Stream's.
Something like
1. `Stream<Item=ParquetMetadata>`
2. Then mapped to a `Stream<ParquetAccessPlan>`
3. Then mapped to a `Stream<RecordBatch>`
And we could then separate out the logic that orchestrated the flow of data
between the stages with the operations that happend inside
##########
datafusion/datasource/src/file_stream.rs:
##########
@@ -130,9 +130,16 @@ impl FileStream {
///
/// Since file opening is mostly IO (and may involve a
/// bunch of sequential IO), it can be parallelized with decoding.
+ ///
+ /// In morsel-driven mode this prefetches the next already-morselized item
+ /// from the shared queue (leaf morsels only β items that still need
+ /// async morselization are left in the queue for the normal Idle β
+ /// Morselizing path).
fn start_next_file(&mut self) -> Option<Result<FileOpenFuture>> {
if self.morsel_driven {
- return None;
+ let queue = Arc::clone(self.shared_queue.as_ref()?);
Review Comment:
Again, I don't think we should do this in this PR, but if we can start
moving towards this design (e.g. use Streams rather than an explicit WorkQueue)
maybe that would be good
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]