alamb commented on code in PR #20481:
URL: https://github.com/apache/datafusion/pull/20481#discussion_r2852747458


##########
datafusion/datasource/src/file_stream.rs:
##########
@@ -113,15 +151,123 @@ impl FileStream {
                 FileStreamState::Idle => {
                     self.file_stream_metrics.time_opening.start();
 
-                    match self.start_next_file().transpose() {
-                        Ok(Some(future)) => self.state = FileStreamState::Open 
{ future },
-                        Ok(None) => return Poll::Ready(None),
+                    if self.morsel_driven {

Review Comment:
   I think it would help make this code easier to understand and test if this 
control loop was encapsulated rather than inlined (maybe a method on WorkQueue?)
   
   



##########
datafusion/datasource/src/file_stream.rs:
##########
@@ -43,10 +45,25 @@ use futures::future::BoxFuture;
 use futures::stream::BoxStream;
 use futures::{FutureExt as _, Stream, StreamExt as _, ready};
 
+/// A guard that decrements the morselizing count when dropped.
+struct MorselizingGuard {
+    queue: Arc<WorkQueue>,
+}
+
+impl Drop for MorselizingGuard {
+    fn drop(&mut self) {
+        self.queue.stop_morselizing();
+    }
+}
+
 /// A stream that iterates record batch by record batch, file over file.
 pub struct FileStream {
     /// An iterator over input files.
     file_iter: VecDeque<PartitionedFile>,
+    /// Shared work queue for morsel-driven execution.

Review Comment:
   While reading this code, it looks to me like the key idea is to break a 
stream of PartitionedFiles into another stream of Morsels
   
   I was thinking instead of having two different control flow paths, one for 
Morsels and one for PartitionedFiles) it might be cleaner to add another layer 
of abstraction that takes a stream of `PartitionedFile`s and produces a stream 
of `ParquetAccessPlan`
   
   When "morselizing" a single PartitionFile could (potentially) produce 
multiple `ParquetAccessPlan`s
   When not "morselizing" then a single PartitionedFile would produce one 
ParquetAccess plan, as today
   
   I think this is pretty close to what WorkQueue does now except the control 
flow is different in the two cases.



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -180,7 +195,293 @@ impl PreparedAccessPlan {
     }
 }
 
+impl ParquetOpener {
+    fn build_row_group_access_filter(
+        file_name: &str,
+        extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
+        row_group_count: usize,
+        row_group_metadata: &[RowGroupMetaData],
+        file_range: Option<&FileRange>,
+        stats_pruning: Option<RowGroupStatisticsPruningContext<'_>>,
+    ) -> Result<RowGroupAccessPlanFilter> {
+        let mut row_groups = RowGroupAccessPlanFilter::new(create_initial_plan(
+            file_name,
+            extensions,
+            row_group_count,
+        )?);
+
+        if let Some(range) = file_range {
+            row_groups.prune_by_range(row_group_metadata, range);
+        }
+
+        if let Some(stats_pruning) = stats_pruning {
+            row_groups.prune_by_statistics(
+                stats_pruning.physical_file_schema.as_ref(),
+                stats_pruning.parquet_schema,
+                row_group_metadata,
+                stats_pruning.predicate,
+                stats_pruning.file_metrics,
+            );
+        }
+
+        Ok(row_groups)
+    }
+}
+
 impl FileOpener for ParquetOpener {
+    fn is_leaf_morsel(&self, file: &PartitionedFile) -> bool {
+        file.extensions
+            .as_ref()
+            .map(|e| e.is::<ParquetMorsel>())
+            .unwrap_or(false)
+    }
+
+    fn morselize(
+        &self,
+        partitioned_file: PartitionedFile,
+    ) -> BoxFuture<'static, Result<Vec<PartitionedFile>>> {
+        if partitioned_file
+            .extensions
+            .as_ref()
+            .map(|e| e.is::<ParquetMorsel>())
+            .unwrap_or(false)
+        {
+            return Box::pin(ready(Ok(vec![partitioned_file])));

Review Comment:
   I found this mechanism a little confusing -- it seems to me like it could 
override the ParquetAccessPlan that a user provides πŸ€” 
   
   I have some ideas on how to make the mapping of `PartitionedFile` --> 
`ParquetAccessPlan` more explicit, which I think then would avoid having to 
save a ParquetMorsel in the extensions



##########
datafusion/datasource/src/file_stream.rs:
##########
@@ -130,9 +130,16 @@ impl FileStream {
     ///
     /// Since file opening is mostly IO (and may involve a
     /// bunch of sequential IO), it can be parallelized with decoding.
+    ///
+    /// In morsel-driven mode this prefetches the next already-morselized item
+    /// from the shared queue (leaf morsels only β€” items that still need
+    /// async morselization are left in the queue for the normal Idle β†’
+    /// Morselizing path).
     fn start_next_file(&mut self) -> Option<Result<FileOpenFuture>> {
         if self.morsel_driven {
-            return None;
+            let queue = Arc::clone(self.shared_queue.as_ref()?);

Review Comment:
   It is probably too much for this PR, but I think it would be great if we 
could move the code towards having a more explicit pipeline of operations. I 
think this could make  the code simpler and set is up to have potentially add 
deeper prefetching and better tune CPU and IO:
   
   For example, it seems to me we have the following steps:
   
   ```
   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” 
   β”‚     Open File      β”‚      β”‚        Plan        β”‚    β”‚      Fetch Data      
 β”‚   β”‚                    β”‚ 
   β”‚  (fetch / decode   │─────▢│(Apply statistics + │───▢│(fetch Row Group 
data) │──▢│    Decode Data     β”‚ 
   β”‚  ParquetMetadata)  β”‚      β”‚      pruning)      β”‚    β”‚                      
 β”‚   β”‚                    β”‚ 
   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ 
                                                                                
                            
           IO+CPU                       CPU                         IO          
               CPU          
   ```
   
   I wonder if we can model the various tasks as async Stream's.
   
   Something like
   1. `Stream<Item=ParquetMetadata>`
   2. Then mapped to a `Stream<ParquetAccessPlan>`
   3. Then mapped to a `Stream<RecordBatch>`
   
   And we could then separate out the logic that orchestrated the flow of data 
between the stages with the operations that happend inside



##########
datafusion/datasource/src/file_stream.rs:
##########
@@ -130,9 +130,16 @@ impl FileStream {
     ///
     /// Since file opening is mostly IO (and may involve a
     /// bunch of sequential IO), it can be parallelized with decoding.
+    ///
+    /// In morsel-driven mode this prefetches the next already-morselized item
+    /// from the shared queue (leaf morsels only β€” items that still need
+    /// async morselization are left in the queue for the normal Idle β†’
+    /// Morselizing path).
     fn start_next_file(&mut self) -> Option<Result<FileOpenFuture>> {
         if self.morsel_driven {
-            return None;
+            let queue = Arc::clone(self.shared_queue.as_ref()?);

Review Comment:
   Again, I don't think we should do this in this PR, but if we can start 
moving towards this design (e.g. use Streams rather than an explicit WorkQueue) 
maybe that would be good



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to