xudong963 commented on code in PR #15379: URL: https://github.com/apache/datafusion/pull/15379#discussion_r2011265887
########## datafusion/datasource/src/file_groups.rs: ########## @@ -354,6 +361,115 @@ impl FileGroupPartitioner { } } +/// Represents a group of partitioned files that'll be processed by a single thread. +/// Maintains optional statistics across all files in the group. +#[derive(Debug, Clone)] +pub struct FileGroup { + /// The files in this group + pub files: Vec<PartitionedFile>, + /// Optional statistics for all files in the group + pub statistics: Option<Statistics>, +} + +impl FileGroup { + /// Creates a new FileGroup from a vector of PartitionedFile objects + pub fn new(files: Vec<PartitionedFile>) -> Self { + Self { + files, + statistics: None, + } + } + + /// Returns the number of files in this group + pub fn len(&self) -> usize { + self.files.len() + } + + /// Set the statistics for this group + pub fn with_statistics(mut self, statistics: Statistics) -> Self { + self.statistics = Some(statistics); + self + } + + pub fn iter(&self) -> impl Iterator<Item = &PartitionedFile> { + self.files.iter() + } + + pub fn is_empty(&self) -> bool { + self.files.is_empty() + } + + /// Removes the last element from the files vector and returns it, or None if empty + pub fn pop(&mut self) -> Option<PartitionedFile> { + self.files.pop() + } + + /// Adds a file to the group + pub fn push(&mut self, file: PartitionedFile) { + self.files.push(file); + } + + /// Partition the list of files into `n` groups + pub fn split_files(&mut self, n: usize) -> Vec<FileGroup> { + if self.is_empty() { + return vec![]; + } + + // ObjectStore::list does not guarantee any consistent order and for some + // implementations such as LocalFileSystem, it may be inconsistent. Thus + // Sort files by path to ensure consistent plans when run more than once. + self.files.sort_by(|a, b| a.path().cmp(b.path())); + + // effectively this is div with rounding up instead of truncating + let chunk_size = self.len().div_ceil(n); + let mut chunks = Vec::with_capacity(n); + let mut current_chunk = Vec::with_capacity(chunk_size); + for file in self.files.drain(..) { + current_chunk.push(file); + if current_chunk.len() == chunk_size { + let full_chunk = FileGroup::new(mem::replace( + &mut current_chunk, + Vec::with_capacity(chunk_size), + )); + chunks.push(full_chunk); + } + } + + if !current_chunk.is_empty() { + chunks.push(FileGroup::new(current_chunk)) + } + + chunks + } +} + +impl Index<usize> for FileGroup { + type Output = PartitionedFile; + + fn index(&self, index: usize) -> &Self::Output { + &self.files[index] + } +} + +impl IndexMut<usize> for FileGroup { + fn index_mut(&mut self, index: usize) -> &mut Self::Output { + &mut self.files[index] + } +} + +impl FromIterator<PartitionedFile> for FileGroup { + fn from_iter<I: IntoIterator<Item = PartitionedFile>>(iter: I) -> Self { + let files = iter.into_iter().collect(); + FileGroup::new(files) + } +} Review Comment: Yeah, added. Now I have three ways to create `FileGroup` by `Vec<PartitionedFile>`. ```rust let files = vec![...]; let file_group = FileGroup::new(files); let file_group = files.into(); let file_group = FileGroup::from(files); ``` I don't strongly prefer which method to use, but the second is simpler! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org