Copilot commented on code in PR #15473:
URL: https://github.com/apache/datafusion/pull/15473#discussion_r2018163460


##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -575,6 +575,95 @@ impl FileScanConfig {
         })
     }
 
+    /// Splits file groups into new groups based on statistics to enable 
efficient parallel processing.
+    ///
+    /// The method distributes files across a target number of partitions 
while ensuring
+    /// files within each partition maintain sort order based on their min/max 
statistics.
+    ///
+    /// The algorithm works by:
+    /// 1. Sorting all files by their minimum values
+    /// 2. Trying to place each file into an existing group where it can 
maintain sort order
+    /// 3. Creating new groups when necessary if a file cannot fit into 
existing groups
+    /// 4. Prioritizing smaller groups when multiple suitable groups exist 
(for load balancing)
+    ///
+    /// # Parameters
+    /// * `table_schema`: Schema containing information about the columns
+    /// * `file_groups`: The original file groups to split
+    /// * `sort_order`: The lexicographical ordering to maintain within each 
group
+    /// * `target_partitions`: The desired number of output partitions
+    ///
+    /// # Returns
+    /// A new set of file groups, where files within each group are 
non-overlapping with respect to
+    /// their min/max statistics and maintain the specified sort order.
+    pub fn split_groups_by_statistics_v2(
+        table_schema: &SchemaRef,
+        file_groups: &[FileGroup],
+        sort_order: &LexOrdering,
+        target_partitions: usize,
+    ) -> Result<Vec<FileGroup>> {
+        let flattened_files = file_groups
+            .iter()
+            .flat_map(FileGroup::iter)
+            .collect::<Vec<_>>();
+
+        if flattened_files.is_empty() {
+            return Ok(vec![]);
+        }
+
+        let statistics = MinMaxStatistics::new_from_files(
+            sort_order,
+            table_schema,
+            None,
+            flattened_files.iter().copied(),
+        )?;
+
+        let indices_sorted_by_min = statistics.min_values_sorted();
+
+        // Initialize with target_partitions empty groups
+        let mut file_groups_indices: Vec<Vec<usize>> =
+            vec![vec![]; target_partitions.max(1)];
+
+        for (idx, min) in indices_sorted_by_min {
+            // Find all groups where the file can fit
+            let mut suitable_groups: Vec<(usize, &mut Vec<usize>)> = 
file_groups_indices
+                .iter_mut()
+                .enumerate()
+                .filter(|(_, group)| {
+                    group.is_empty()
+                        || min
+                            > statistics
+                                .max(*group.last().expect("groups should not 
be empty"))
+                })
+                .collect();
+
+            // Sort by group size to prioritize smaller groups
+            suitable_groups.sort_by_key(|(_, group)| group.len());
+
+            if let Some((_, group)) = suitable_groups.first_mut() {
+                group.push(idx);
+            } else {
+                // Create a new group if no existing group fits
+                file_groups_indices.push(vec![idx]);

Review Comment:
   Creating a new group when no existing group fits may result in more groups 
than the target partitions. Consider enforcing an upper limit based on 
target_partitions if the desired behavior is to restrict the output to that 
number.
   ```suggestion
               } else if file_groups_indices.len() < target_partitions {
                   // Create a new group if no existing group fits and limit 
not reached
                   file_groups_indices.push(vec![idx]);
               } else {
                   // Assign to the smallest existing group if limit is reached
                   let (_, smallest_group) = file_groups_indices
                       .iter_mut()
                       .enumerate()
                       .min_by_key(|(_, group)| group.len())
                       .expect("There should be at least one group");
                   smallest_group.push(idx);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to