friendlymatthew commented on code in PR #17242:
URL: https://github.com/apache/datafusion/pull/17242#discussion_r2301591751


##########
datafusion/datasource/src/file.rs:
##########
@@ -150,3 +165,193 @@ pub trait FileSource: Send + Sync {
         None
     }
 }
+
+impl<T: FileSource + 'static> DataSource for T {
+    fn open(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        let object_store = context
+            .runtime_env()
+            .object_store(&self.config().object_store_url)?;
+
+        let batch_size = self
+            .config()
+            .batch_size
+            .unwrap_or_else(|| context.session_config().batch_size());
+
+        let config = FileScanConfigBuilder::from(self.config().clone())
+            .with_batch_size(Some(batch_size))
+            .build();
+
+        let source = self.with_config(config);
+
+        let opener = source.create_file_opener(object_store, partition);
+
+        let stream =
+            FileStream::new(source.config(), partition, opener, 
source.metrics_inner())?;
+        Ok(Box::pin(cooperative(stream)))
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                let schema = self.config().projected_schema();
+                let orderings = get_projected_output_ordering(self.config(), 
&schema);
+
+                write!(f, "file_groups=")?;
+                FileGroupsDisplay(&self.config().file_groups).fmt_as(t, f)?;
+
+                if !schema.fields().is_empty() {
+                    write!(f, ", projection={}", 
ProjectSchemaDisplay(&schema))?;
+                }
+
+                if let Some(limit) = self.config().limit {
+                    write!(f, ", limit={limit}")?;
+                }
+
+                display_orderings(f, &orderings)?;
+
+                if !self.config().constraints.is_empty() {
+                    write!(f, ", {}", self.config().constraints)?;
+                }
+
+                write!(f, ", file_type={}", self.file_type())?;
+                self.fmt_extra(t, f)
+            }
+            DisplayFormatType::TreeRender => {
+                writeln!(f, "format={}", self.file_type())?;
+                self.fmt_extra(t, f)?;
+                let num_files = self
+                    .config()
+                    .file_groups
+                    .iter()
+                    .map(|fg| fg.len())
+                    .sum::<usize>();
+                writeln!(f, "files={num_files}")?;
+                Ok(())
+            }
+        }
+    }
+
+    /// If supported by the underlying [`FileSource`], redistribute files 
across partitions according to their size.
+    fn repartitioned(
+        &self,
+        target_partitions: usize,
+        repartition_file_min_size: usize,
+        output_ordering: Option<LexOrdering>,
+    ) -> Result<Option<Arc<dyn DataSource>>> {
+        let source = self.repartitioned_inner(
+            target_partitions,
+            repartition_file_min_size,
+            output_ordering,
+        )?;
+
+        Ok(source.map(|s| s.as_data_source()))
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(self.config().file_groups.len())
+    }
+
+    fn eq_properties(&self) -> EquivalenceProperties {
+        let (schema, constraints, _, orderings) =
+            self.config().project(self.file_source_statistics());
+        EquivalenceProperties::new_with_orderings(schema, orderings)
+            .with_constraints(constraints)
+    }
+
+    fn scheduling_type(&self) -> SchedulingType {
+        SchedulingType::Cooperative
+    }
+
+    fn data_source_statistics(&self) -> Result<Statistics> {
+        Ok(self.config().projected_stats(self.file_source_statistics()))
+    }

Review Comment:
   So `self.file_source_statistics()` isn't unpacked as 
`self.config.file_source_projected_statistics`, since `ParquetSource` overrides 
the default impl of that trait method.
   
   But I agree with you we can simplify this, IIRC I chose to keep it like this 
to get a "minimal" refactor compiling. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to