Copilot commented on code in PR #15473: URL: https://github.com/apache/datafusion/pull/15473#discussion_r2018163460
########## datafusion/datasource/src/file_scan_config.rs: ########## @@ -575,6 +575,95 @@ impl FileScanConfig { }) } + /// Splits file groups into new groups based on statistics to enable efficient parallel processing. + /// + /// The method distributes files across a target number of partitions while ensuring + /// files within each partition maintain sort order based on their min/max statistics. + /// + /// The algorithm works by: + /// 1. Sorting all files by their minimum values + /// 2. Trying to place each file into an existing group where it can maintain sort order + /// 3. Creating new groups when necessary if a file cannot fit into existing groups + /// 4. Prioritizing smaller groups when multiple suitable groups exist (for load balancing) + /// + /// # Parameters + /// * `table_schema`: Schema containing information about the columns + /// * `file_groups`: The original file groups to split + /// * `sort_order`: The lexicographical ordering to maintain within each group + /// * `target_partitions`: The desired number of output partitions + /// + /// # Returns + /// A new set of file groups, where files within each group are non-overlapping with respect to + /// their min/max statistics and maintain the specified sort order. + pub fn split_groups_by_statistics_v2( + table_schema: &SchemaRef, + file_groups: &[FileGroup], + sort_order: &LexOrdering, + target_partitions: usize, + ) -> Result<Vec<FileGroup>> { + let flattened_files = file_groups + .iter() + .flat_map(FileGroup::iter) + .collect::<Vec<_>>(); + + if flattened_files.is_empty() { + return Ok(vec![]); + } + + let statistics = MinMaxStatistics::new_from_files( + sort_order, + table_schema, + None, + flattened_files.iter().copied(), + )?; + + let indices_sorted_by_min = statistics.min_values_sorted(); + + // Initialize with target_partitions empty groups + let mut file_groups_indices: Vec<Vec<usize>> = + vec![vec![]; target_partitions.max(1)]; + + for (idx, min) in indices_sorted_by_min { + // Find all groups where the file can fit + let mut suitable_groups: Vec<(usize, &mut Vec<usize>)> = file_groups_indices + .iter_mut() + .enumerate() + .filter(|(_, group)| { + group.is_empty() + || min + > statistics + .max(*group.last().expect("groups should not be empty")) + }) + .collect(); + + // Sort by group size to prioritize smaller groups + suitable_groups.sort_by_key(|(_, group)| group.len()); + + if let Some((_, group)) = suitable_groups.first_mut() { + group.push(idx); + } else { + // Create a new group if no existing group fits + file_groups_indices.push(vec![idx]); Review Comment: Creating a new group when no existing group fits may result in more groups than the target partitions. Consider enforcing an upper limit based on target_partitions if the desired behavior is to restrict the output to that number. ```suggestion } else if file_groups_indices.len() < target_partitions { // Create a new group if no existing group fits and limit not reached file_groups_indices.push(vec![idx]); } else { // Assign to the smallest existing group if limit is reached let (_, smallest_group) = file_groups_indices .iter_mut() .enumerate() .min_by_key(|(_, group)| group.len()) .expect("There should be at least one group"); smallest_group.push(idx); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org