rluvaton commented on code in PR #15700: URL: https://github.com/apache/datafusion/pull/15700#discussion_r2041222937
########## datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs: ########## @@ -753,3 +765,226 @@ async fn test_single_mode_aggregate_with_spill() -> Result<()> { Ok(()) } + +/// A Mock ExecutionPlan that can be used for writing tests of other +/// ExecutionPlans +pub struct StreamExec { + /// the results to send back + stream: Mutex<Option<SendableRecordBatchStream>>, + /// if true (the default), sends data using a separate task to ensure the + /// batches are not available without this stream yielding first + use_task: bool, + cache: PlanProperties, +} + +impl Debug for StreamExec { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "StreamExec") + } +} + +impl StreamExec { + /// Create a new `MockExec` with a single partition that returns + /// the specified `Results`s. + /// + /// By default, the batches are not produced immediately (the + /// caller has to actually yield and another task must run) to + /// ensure any poll loops are correct. This behavior can be + /// changed with `with_use_task` + pub fn new(stream: SendableRecordBatchStream) -> Self { + let cache = Self::compute_properties(stream.schema()); + Self { + stream: Mutex::new(Some(stream)), + use_task: true, + cache, + } + } + + /// If `use_task` is true (the default) then the batches are sent + /// back using a separate task to ensure the underlying stream is + /// not immediately ready + pub fn with_use_task(mut self, use_task: bool) -> Self { + self.use_task = use_task; + self + } + + /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. + fn compute_properties(schema: SchemaRef) -> PlanProperties { + PlanProperties::new( + EquivalenceProperties::new(schema), + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + ) + } +} + +impl DisplayAs for StreamExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "StreamExec:") + } + DisplayFormatType::TreeRender => { + // TODO: collect info + write!(f, "") + } + } + } +} + +impl ExecutionPlan for StreamExec { + fn name(&self) -> &'static str { + Self::static_name() + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.cache + } + + fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { + vec![] + } + + fn with_new_children( + self: Arc<Self>, + _: Vec<Arc<dyn ExecutionPlan>>, + ) -> Result<Arc<dyn ExecutionPlan>> { + unimplemented!() + } + + /// Returns a stream which yields data + fn execute( + &self, + partition: usize, + _context: Arc<TaskContext>, + ) -> Result<SendableRecordBatchStream> { + assert_eq!(partition, 0); + + let stream = self.stream.lock().unwrap().take(); + + stream.ok_or(DataFusionError::Internal( + "Stream already consumed".to_string(), + )) + } +} + + +#[tokio::test] +async fn test_low_cardinality() -> Result<()> { Review Comment: This fails on main on OOM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org