alamb commented on code in PR #13540:
URL: https://github.com/apache/datafusion/pull/13540#discussion_r1864256159
##########
datafusion/physical-plan/src/memory.rs:
##########
@@ -365,8 +366,165 @@ impl RecordBatchStream for MemoryStream {
}
}
+pub trait LazyBatchGenerator: Send + Sync + fmt::Debug + fmt::Display {
+ /// Generate the next batch, return `None` when no more batches are
available
+ fn generate_next_batch(&mut self) -> Result<Option<RecordBatch>>;
+}
+
+/// Execution plan for lazy in-memory batches of data
+///
+/// This plan generates output batches lazily, it doesn't have to buffer all
batches
+/// in memory up front (compared to `MemoryExec`), thus consuming constant
memory.
+pub struct LazyMemoryExec {
+ /// Schema representing the data
+ schema: SchemaRef,
+ /// Functions to generate batches for each partition
+ batch_generators: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>,
+ /// Total number of rows to generate for statistics
Review Comment:
this comment doesn't seem quite right
##########
datafusion/physical-plan/src/memory.rs:
##########
@@ -365,8 +365,165 @@ impl RecordBatchStream for MemoryStream {
}
}
+pub trait StreamingBatchGenerator: Send + Sync + fmt::Debug + fmt::Display {
+ /// Generate the next batch, return `None` when no more batches are
available
+ fn generate_next_batch(&mut self) -> Result<Option<RecordBatch>>;
+
+ /// Creates a boxed clone of this generator.
+ ///
+ /// This method is required because `Clone` cannot be directly implemented
for
+ /// trait objects. It provides a way to clone trait objects of
+ /// StreamingBatchGenerator while maintaining proper type erasure.
+ fn clone_box(&self) -> Box<dyn StreamingBatchGenerator>;
+}
+
+/// Execution plan for streaming in-memory batches of data
+///
+/// This plan generates output batches lazily, it doesn't have to buffer all
batches
+/// in memory up front (compared to `MemoryExec`), thus consuming constant
memory.
+pub struct StreamingMemoryExec {
+ /// Schema representing the data
+ schema: SchemaRef,
+ /// Functions to generate batches for each partition
+ batch_generators: Vec<Box<dyn StreamingBatchGenerator>>,
+ /// Total number of rows to generate for statistics
+ cache: PlanProperties,
+}
+
+impl StreamingMemoryExec {
+ /// Create a new streaming memory execution plan
+ pub fn try_new(
+ schema: SchemaRef,
+ generators: Vec<Box<dyn StreamingBatchGenerator>>,
+ ) -> Result<Self> {
+ let cache = PlanProperties::new(
+ EquivalenceProperties::new(Arc::clone(&schema)),
+ Partitioning::RoundRobinBatch(generators.len()),
Review Comment:
Maybe you can look at
https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#method.repartitioned
or similar methods 🤔
##########
datafusion/physical-plan/src/memory.rs:
##########
@@ -365,8 +366,165 @@ impl RecordBatchStream for MemoryStream {
}
}
+pub trait LazyBatchGenerator: Send + Sync + fmt::Debug + fmt::Display {
Review Comment:
This looks almost identical to
https://docs.rs/datafusion/latest/datafusion/execution/type.SendableRecordBatchStream.html
I wonder if we really need a new `pub` trait? Using the existing one allows
using all the combinators in `futures::stream`
That way this exec could be something generic that simply wrapped an input
stream which I think would be useful for several other reasons. For example,
implementing custom TableProviders as well as implementing `read_stram` like
```rust
/// Read the contents of anything that made RecordBatchStreams
///
let df = df.read_stream(record_batch_stream)
.aggregate(col("foo"))
.collect()?
```
##########
datafusion/catalog/src/table.rs:
##########
@@ -297,3 +299,40 @@ pub trait TableProviderFactory: Debug + Sync + Send {
cmd: &CreateExternalTable,
) -> Result<Arc<dyn TableProvider>>;
}
+
+/// A trait for table function implementations
+pub trait TableFunctionImpl: Debug + Sync + Send {
Review Comment:
I think doing it as a follow on PR makes sense to me -- perhaps we can file
a ticket
Another thing that would be nice for a follow on PR is to move this trait
into its own module (`catalog/src/table_function.rs` perhaps)
##########
datafusion/functions-table/src/generate_series.rs:
##########
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::Int64Array;
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use arrow::record_batch::RecordBatch;
+use async_trait::async_trait;
+use datafusion_catalog::Session;
+use datafusion_catalog::TableFunctionImpl;
+use datafusion_catalog::TableProvider;
+use datafusion_common::{not_impl_err, plan_err, Result, ScalarValue};
+use datafusion_expr::{Expr, TableType};
+use datafusion_physical_plan::memory::{LazyBatchGenerator, LazyMemoryExec};
+use datafusion_physical_plan::ExecutionPlan;
+use parking_lot::RwLock;
+use std::fmt;
+use std::sync::Arc;
+
+/// Table that generates a series of integers from `start`(inclusive) to
`end`(inclusive)
+#[derive(Debug, Clone)]
+struct GenerateSeriesTable {
+ schema: SchemaRef,
+ // None if input is Null
+ start: Option<i64>,
+ // None if input is Null
+ end: Option<i64>,
+}
+
+/// Table state that generates a series of integers from `start`(inclusive) to
`end`(inclusive)
+#[derive(Debug, Clone)]
+struct GenerateSeriesState {
+ schema: SchemaRef,
+ _start: i64, // Kept for display
+ end: i64,
+ batch_size: usize,
+
+ /// Tracks current position when generating table
+ current: i64,
+}
+
+/// Detail to display for 'Explain' plan
+impl fmt::Display for GenerateSeriesState {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(
+ f,
+ "generate_series: start={}, end={}, batch_size={}",
+ self._start, self.end, self.batch_size
+ )
+ }
+}
+
+impl LazyBatchGenerator for GenerateSeriesState {
+ fn generate_next_batch(&mut self) -> Result<Option<RecordBatch>> {
+ // Check if we've reached the end
+ if self.current > self.end {
+ return Ok(None);
+ }
+
+ // Construct batch
+ let batch_end = (self.current + self.batch_size as i64 -
1).min(self.end);
+ let array = Int64Array::from_iter_values(self.current..=batch_end);
+ let batch = RecordBatch::try_new(self.schema.clone(),
vec![Arc::new(array)])?;
+
+ // Update current position for next batch
+ self.current = batch_end + 1;
+
+ Ok(Some(batch))
+ }
+}
+
+#[async_trait]
+impl TableProvider for GenerateSeriesTable {
+ fn as_any(&self) -> &dyn std::any::Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+
+ fn table_type(&self) -> TableType {
+ TableType::Base
+ }
+
+ async fn scan(
+ &self,
+ state: &dyn Session,
+ _projection: Option<&Vec<usize>>,
+ _filters: &[Expr],
+ _limit: Option<usize>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let batch_size = state.config_options().execution.batch_size;
+ match (self.start, self.end) {
+ (Some(start), Some(end)) => {
+ if start > end {
+ return plan_err!(
+ "End value must be greater than or equal to start
value"
+ );
+ }
+
+ Ok(Arc::new(LazyMemoryExec::try_new(
+ self.schema.clone(),
+ vec![Arc::new(RwLock::new(GenerateSeriesState {
+ schema: self.schema.clone(),
+ _start: start,
+ end,
+ current: start,
+ batch_size,
+ }))],
+ )?))
+ }
+ _ => {
+ // Either start or end is None, return a generator that
outputs 0 rows
+ Ok(Arc::new(LazyMemoryExec::try_new(
+ self.schema.clone(),
+ vec![Arc::new(RwLock::new(GenerateSeriesState {
+ schema: self.schema.clone(),
+ _start: 0,
+ end: 0,
+ current: 1,
+ batch_size,
+ }))],
+ )?))
+ }
+ }
+ }
+}
+
+#[derive(Debug)]
+pub struct GenerateSeriesFunc {}
+
+impl TableFunctionImpl for GenerateSeriesFunc {
+ // Check input `exprs` type and number. Input validity check (e.g. start
<= end)
+ // will be performed in `TableProvider::scan`
+ fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
+ // TODO: support 1 or 3 arguments following DuckDB:
Review Comment:
can we possibly file a ticket to track this?
##########
Cargo.toml:
##########
@@ -30,6 +30,7 @@ members = [
"datafusion/functions",
"datafusion/functions-aggregate",
"datafusion/functions-aggregate-common",
+ "datafusion/functions-table",
Review Comment:
100%
##########
datafusion/functions-table/src/generate_series.rs:
##########
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::Int64Array;
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use arrow::record_batch::RecordBatch;
+use async_trait::async_trait;
+use datafusion_catalog::Session;
+use datafusion_catalog::TableFunctionImpl;
+use datafusion_catalog::TableProvider;
+use datafusion_common::{not_impl_err, plan_err, Result, ScalarValue};
+use datafusion_expr::{Expr, TableType};
+use datafusion_physical_plan::memory::{LazyBatchGenerator, LazyMemoryExec};
+use datafusion_physical_plan::ExecutionPlan;
+use parking_lot::RwLock;
+use std::fmt;
+use std::sync::Arc;
+
+/// Table that generates a series of integers from `start`(inclusive) to
`end`(inclusive)
+#[derive(Debug, Clone)]
+struct GenerateSeriesTable {
+ schema: SchemaRef,
+ // None if input is Null
+ start: Option<i64>,
+ // None if input is Null
+ end: Option<i64>,
+}
+
+/// Table state that generates a series of integers from `start`(inclusive) to
`end`(inclusive)
+#[derive(Debug, Clone)]
+struct GenerateSeriesState {
+ schema: SchemaRef,
+ _start: i64, // Kept for display
Review Comment:
does it need to be prefixed with `_` if it is used for display 🤔
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]