bubulalabu commented on PR #18535:
URL: https://github.com/apache/datafusion/pull/18535#issuecomment-3596204994
Thanks @alamb for the detailed feedback! This is really helpful. Let me
address your concerns:
## On the Single API Question
> However, I think it is important to make a single API for TableFunctions
that can handle both cases:
> 1. All arguments are constants (what is handled today)
> 2. The arguments are actually tables (what this PR supports)
I agree. The current `BatchedTableFunctionImpl` trait works for all three
use cases:
**1. Constant arguments** (what's handled today):
```sql
SELECT * FROM generate_series(1, 100)
```
**2. LATERAL joins** (row-correlated arguments):
```sql
SELECT * FROM t CROSS JOIN LATERAL generate_series(t.start, t.end)
```
**3. Table-valued parameters** (table as argument - future work):
```sql
SELECT * FROM generate_series(SELECT start, end FROM t)
```
The key idea is that the same trait works for all cases:
- **Constant arguments**: The planner evaluates the constant expressions
into a single-row batch, calls `invoke_batch` once with arrays of length 1, and
returns the results. This is what the PR implements as "standalone batched
table function."
- **LATERAL joins**: The execution node receives batches from the input
table, calls `invoke_batch` with column arrays, uses `input_row_indices` to
correlate outputs to inputs, and combines them. This is fully implemented in
the PR.
- **Table-valued parameters**: The execution node would receive batches from
the subquery, call `invoke_batch` with the subquery's column arrays, and pass
through the results. The trait already provides all the building blocks for
this - it's just a different execution node (future work).
Same trait, different execution strategies based on the logical plan node.
## On the Execution Model
> I don't understand this execution model -- my mental model is that a table
function receives an arbitrary table as input, and produces an arbitrary table
as output.
>
> So in my mind this corresponds to getting a SendableRecordBatchStream as
input and returning a SendableRecordBatchStream as output
That's what's happening. The key is where the stream boundaries are.
The trait signature is:
```rust
async fn invoke_batch(
&self,
args: &[ArrayRef],
projection: Option<&[usize]>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Stream<BatchResultChunk>>
```
But in the execution context:
- **Input**: The execution node (`BatchedTableFunctionExec`) receives a
`SendableRecordBatchStream` from its child
- **Processing**: For each `RecordBatch` from that stream, it evaluates the
argument expressions to get `&[ArrayRef]`, then calls `invoke_batch`
- **Output**: The execution node returns a `SendableRecordBatchStream` that
chains together all the result chunks
So from the execution plan perspective, it's exactly
`SendableRecordBatchStream` → `SendableRecordBatchStream`.
The reason the trait operates on `&[ArrayRef]` instead of
`SendableRecordBatchStream` is to match DataFusion's existing function patterns:
- `ScalarUDFImpl`: `&[ArrayRef]` → `ArrayRef`
- `AggregateUDFImpl`: `&[ArrayRef]` → scalar
- `WindowUDFImpl`: `&[ArrayRef]` (partition) → `ArrayRef`
- **`BatchedTableFunctionImpl`**: `&[ArrayRef]` → `Stream<BatchResultChunk>`
All of these receive already-evaluated array arguments. The execution nodes
handle the streaming/batching.
## On Buffering Concerns
> It assumes the entire input will be buffered in memory, which is different
than most of the rest of DataFusion which is "streaming" (operates on a batch
at a time)
I wanted to clarify this - the implementation is fully streaming using
`SendableRecordBatchStream` throughout:
```rust
impl ExecutionPlan for BatchedTableFunctionExec {
fn execute(&self, partition: usize, context: Arc<TaskContext>)
-> Result<SendableRecordBatchStream> {
// Get streaming input
let input_stream = self.input.execute(partition, context)?;
// Return wrapped stream
Ok(Box::pin(BatchedTableFunctionStream {
input_stream, // SendableRecordBatchStream
// ...
}))
}
}
```
**Execution flow:**
1. `input.execute()` returns `SendableRecordBatchStream`
2. `BatchedTableFunctionStream` polls this stream for batches
3. For each batch: evaluate args, call `invoke_batch`, stream results
4. No buffering - processes one RecordBatch at a time
The "batching" refers to DataFusion's natural RecordBatch size, not
buffering the entire table.
## On the ScanArgs Extension
> It seems like it would be a much smaller / simpler API to simple add a new
field to ScanArgs
I explored this approach, but there's a fundamental timing issue:
**Problem**: `TableProvider::scan()` is called at planning time, but the
input stream only exists at execution time.
```rust
// During planning
let provider = table_function.call(args)?; // Creates TableProvider
let exec_plan = provider.scan(args)?; // Returns ExecutionPlan
// But input stream doesn't exist
yet!
// During execution
let stream = exec_plan.execute()?; // Now we have data
```
You'd need `scan()` to return an `ExecutionPlan` that "will receive a stream
later", which breaks the TableProvider abstraction.
Additionally, `TableProvider` has no concept of row correlation (which
output came from which input), which is essential for LATERAL semantics. The
join would need to happen externally, but then you've essentially reimplemented
what `BatchedTableFunctionExec` already does.
## On API Surface
> A second trait that is very similar to TableFunctionImpl and all the
resulting boiler plate / API surface (eg. registration functions, etc)
I totally understand the concern about having two similar traits. The
current `TableFunctionImpl` is a planning-time TableProvider generator:
```rust
trait TableFunctionImpl {
fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>>
}
```
It receives constant expressions at planning time and returns a
TableProvider.
`BatchedTableFunctionImpl` is a superset - it handles everything the current
trait does (constant arguments) plus LATERAL joins.
**On the naming:**
I used "Batched" only because `TableFunctionImpl` is already taken. The
"batched" refers to processing arrays (batches of values) at execution time,
matching other UDF traits. **Ideally this would just be called
`TableFunctionImpl`**.
## On Decorrelation and Future Compatibility
> This is what I aspire to support in DataFusion -- generic decorrelated
joins. Thus I would like to create a TableFunction API that does not need major
rework if/when we support decorrelated joins
>
> For example, it seems like LogicalPlan::LateralBatchedTableFunction is
very specialized and wouldn't be used with anything other than a LATERAL table
function.
The API doesn't prevent future decorrelation optimizations. The trait is
designed to support it:
- **Stateless**: The `invoke_batch` method has no hidden state - it's a pure
function that processes `&[ArrayRef]` arguments
- **Determinism-aware**: The `signature()` method returns a `Signature` with
volatility (Immutable/Stable/Volatile), allowing the optimizer to know if
results can be cached or if duplicate inputs produce duplicate outputs
- **Optimizer-friendly**: The explicit `LateralBatchedTableFunction` logical
node tells the optimizer this is a correlated operation
DuckDB deduplicates correlated arguments and uses hash joins to reconnect
results. A similar approach could work here:
```
Physical planning creates:
HashAggregate(input, group_by=[correlated_cols]) // Deduplicate
↓
BatchedTableFunctionExec(deduplicated, func)
↓
HashJoin(original_input, results, on=[correlated_cols])
```
For example, if a function has `Signature::volatility =
Volatility::Immutable`, the optimizer knows that identical inputs always
produce identical outputs, so it can safely deduplicate the arguments.
The `LateralBatchedTableFunction` logical node is specialized for LATERAL,
but that's intentional - it tells the optimizer "this is a correlated
operation". When you write:
```sql
SELECT * FROM t CROSS JOIN LATERAL my_func(t.x, t.y)
```
The planner needs to know this is correlated so it can choose the right
execution strategy (nested loop, decorrelation, etc.). This is similar to how
DuckDB has specialized logical nodes for correlated operations.
## On Table-Valued Parameters
> For example, how would we plan a query that passed the results of one
query to a table function?
>
> SELECT * from my_table_func(select time, log FROM t)
This would use a different logical node but the same trait:
**For LATERAL joins:**
- Logical node: `LateralBatchedTableFunction(input, func, args=[col refs])`
- Execution node: `BatchedTableFunctionExec`
- Uses `input_row_indices` to correlate: which output rows came from which
input rows
- Combines input columns + output columns
**For table-valued parameters:**
- Logical node: `TableValuedFunction(subquery, func)` (future work)
- Execution node: Same `BatchedTableFunctionExec` (or simplified variant)
- Subquery executes, produces `SendableRecordBatchStream`
- Calls `invoke_batch` with subquery column values
- Ignores `input_row_indices` (or treats as identity mapping)
- Just passes through the transformed stream
Same trait, different logical plan nodes. The trait is the execution-time
API, the logical nodes express the semantics.
## The Unified Trait
Here's the core trait definition:
```rust
pub trait BatchedTableFunctionImpl: Send + Sync + Debug {
fn name(&self) -> &str;
fn signature(&self) -> &Signature; // Type checking & volatility
fn return_type(&self, arg_types: &[DataType]) -> Result<Schema>;
fn supports_filters_pushdown(&self, filters: &[&Expr])
-> Result<Vec<TableProviderFilterPushDown>>;
async fn invoke_batch(
&self,
args: &[ArrayRef], // N invocations as arrays of length N
projection: Option<&[usize]>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<BatchResultStream>; // Stream of BatchResultChunk
}
pub struct BatchResultChunk {
pub output: RecordBatch, // Generated rows
pub input_row_indices: Vec<u32>, // Maps each output row to input row
[0..N)
}
```
Key methods:
- **`signature()`**: Defines argument types and volatility for optimization
(see decorrelation section above)
- **`return_type()`**: Infers output schema from input types during logical
planning
- **`supports_filters_pushdown()`**: Allows pushing filters into the
function for efficient execution
- **`invoke_batch()`**: Processes a batch of N invocations, returns a stream
of `BatchResultChunk`s where `input_row_indices` maps output rows back to their
source input rows
Example implementation for `generate_series`:
```rust
impl BatchedTableFunctionImpl for GenerateSeries {
fn signature(&self) -> &Signature {
// Volatility::Immutable means same inputs → same outputs
// Enables decorrelation optimization
Signature::new(
TypeSignature::Exact(vec![Int64, Int64]),
Volatility::Immutable
)
}
async fn invoke_batch(&self, args: &[ArrayRef], ...) ->
Result<BatchResultStream> {
let starts = as_int64_array(&args[0])?; // N start values
let ends = as_int64_array(&args[1])?; // N end values
// Process N invocations (where N = args[0].len())
// Same implementation works for:
// - Constant arguments: N=1, single invocation
// - LATERAL joins: N=batch_size from input table
// - Table-valued parameters: N=rows from subquery result
}
}
```
## Comparison with DuckDB's Approach
**DuckDB's implicit correlation:**
- Table functions maintain internal state tracking which input row they're
processing
- Processes input rows sequentially
- Correlation is implicit in execution order
- State is in `LocalTableFunctionState`
**This PR's explicit correlation:**
- Table functions are stateless - no hidden state
- Can process input rows in any order
- Correlation is explicit via `input_row_indices`
- Simpler to reason about and debug
The explicit approach offers more flexibility for future optimizations like
parallel invocation or reordering, whereas DuckDB's stateful model requires
sequential processing.
Additionally, the batched invocation model gives implementations more
freedom:
- They can decide to process inputs in parallel (e.g., spawn multiple
threads to handle different ranges)
- They can apply context-specific optimizations (e.g., vectorized
processing, SIMD operations across the batch)
- They can amortize setup costs across multiple invocations (e.g., single
connection pool checkout for N database queries)
That said, DuckDB's approach is proven in production and there may be
advantages I'm not seeing!
## Performance Benchmarks
I ran some benchmarks comparing this implementation against DuckDB v1.3.2 to
get a sense of how the batched approach performs for LATERAL joins:
**LATERAL Join Performance:**
| LATERAL Calls | DuckDB Time (s) | DataFusion Time (s) | Speedup |
|---------------|-----------------|---------------------|---------|
| 100,000 | 0.06 | 0.01 | 6.0x |
| 500,000 | 0.16 | 0.01 | 16.0x |
| 1,000,000 | 0.32 | 0.02 | 16.0x |
| 2,000,000 | 0.55 | 0.02 | 27.5x |
| 5,000,000 | 1.33 | 0.03 | 44.3x |
| 10,000,000 | 2.64 | 0.06 | 44.0x |
| 20,000,000 | 5.24 | 0.10 | 52.4x |
Key findings:
- The batched processing scales well - the speedup grows from 6x at 100K
calls to 52x at 20M calls
- It handles large numbers of correlated calls efficiently (20 million calls
in 0.10 seconds)
- The performance advantage grows with scale
Note: These are microbenchmarks using `generate_series`.
The results suggest the explicit `input_row_indices` approach is more
efficient than DuckDB's stateful iteration for this use case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]