suremarc commented on code in PR #15473: URL: https://github.com/apache/datafusion/pull/15473#discussion_r2021230783
########## datafusion/datasource/src/file_scan_config.rs: ########## @@ -575,6 +575,95 @@ impl FileScanConfig { }) } + /// Splits file groups into new groups based on statistics to enable efficient parallel processing. + /// + /// The method distributes files across a target number of partitions while ensuring + /// files within each partition maintain sort order based on their min/max statistics. + /// + /// The algorithm works by: + /// 1. Sorting all files by their minimum values + /// 2. Trying to place each file into an existing group where it can maintain sort order + /// 3. Creating new groups when necessary if a file cannot fit into existing groups + /// 4. Prioritizing smaller groups when multiple suitable groups exist (for load balancing) + /// + /// # Parameters + /// * `table_schema`: Schema containing information about the columns + /// * `file_groups`: The original file groups to split + /// * `sort_order`: The lexicographical ordering to maintain within each group + /// * `target_partitions`: The desired number of output partitions + /// + /// # Returns + /// A new set of file groups, where files within each group are non-overlapping with respect to + /// their min/max statistics and maintain the specified sort order. + pub fn split_groups_by_statistics_v2( + table_schema: &SchemaRef, + file_groups: &[FileGroup], + sort_order: &LexOrdering, + target_partitions: usize, + ) -> Result<Vec<FileGroup>> { + let flattened_files = file_groups + .iter() + .flat_map(FileGroup::iter) + .collect::<Vec<_>>(); + + if flattened_files.is_empty() { + return Ok(vec![]); + } + + let statistics = MinMaxStatistics::new_from_files( + sort_order, + table_schema, + None, + flattened_files.iter().copied(), + )?; + + let indices_sorted_by_min = statistics.min_values_sorted(); + + // Initialize with target_partitions empty groups + let mut file_groups_indices: Vec<Vec<usize>> = + vec![vec![]; target_partitions.max(1)]; + + for (idx, min) in indices_sorted_by_min { + // Find all groups where the file can fit + let mut suitable_groups: Vec<(usize, &mut Vec<usize>)> = file_groups_indices + .iter_mut() + .enumerate() + .filter(|(_, group)| { + group.is_empty() + || min + > statistics + .max(*group.last().expect("groups should not be empty")) + }) + .collect(); + + // Sort by group size to prioritize smaller groups + suitable_groups.sort_by_key(|(_, group)| group.len()); + + if let Some((_, group)) = suitable_groups.first_mut() { + group.push(idx); + } else { + // Create a new group if no existing group fits + file_groups_indices.push(vec![idx]); Review Comment: I guess you are right, it should be rare in the real world, and a few hundred ms isn't the end of the world (it could be a few multiples of 89ms if there are multiple sort columns). Can probably revisit it if it becomes a problem -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org