gstvg commented on PR #18921:
URL: https://github.com/apache/datafusion/pull/18921#issuecomment-3573747241
# Traversing Expr trees with a schema that include lambdas parameters
The parameters of a lambda aren't present in the schema of the plan they
belong to. During tree traversals that use a schema to check expressions
datatype, nullability and metadata, there must be a way to access a schema
which includes those parameters.
<details><summary>Expr tree traversal with wrong schema</summary>
```sql
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷
╷ ╷
╷ a = Int32 ╷
╷ b = List(List(Int32)) ╷
╷ c = Int32 ╷
╷ ╷
╷ ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ (b, i) -> array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ a = Int32 ╷
╷ b = List(List(Int32)) ╷
╷ c = Int32 ╷
╷ ╷
╷ !! missing "i", incorrect "b" type !! ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ a = Int32 ╷
╷ b = List(List(Int32)) ╷
╷ c = Int32 ╷
╷ ╷
╷ !! missing "i", incorrect "b" type !! ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ b -> b + c + i ╷
╷ ╷
╷ a = Int32 ╷
╷ b = List(List(Int32)) ╷
╷ c = Int32 ╷
╷ ╷
╷ !! missing "i", incorrect "b" type !! ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
```
</details>
## Option 1. Per-lambda schema with a new set of TreeNode methods:
*_with_schema
Once again, this PR adds another set of TreeNode-like methods on
[logical](https://gstvg.github.io/datafusion/index.html?search=expr%3A%3Awith_schema)
and
[physical](https://gstvg.github.io/datafusion/physical_expr/trait.PhysicalExprExt.html)
expressions, that while traversing expression trees, when they find a
ScalarUDF that contains a lambda on its arguments, uses
ScalarUDF::lambdas_parameters to create a schema adjusted for each of its
arguments, and pass it as an argument to the visiting/transforming function.
```rust
impl Expr {
pub fn transform_with_schema<
F: FnMut(Self, &DFSchema) -> Result<Transformed<Self>>,
>(
self,
schema: &DFSchema,
f: F,
) -> Result<Transformed<Self>> {}
}
```
Example usage:
```rust
pub fn infer_placeholder_types(self, schema: &DFSchema) -> Result<(Expr,
bool)> {
let mut has_placeholder = false;
// Provide the schema as the first argument.
// Transforming closure receive an adjusted_schema as argument
self.transform_with_schema(schema, |mut expr, adjusted_schema| {
match &mut expr {
// Default to assuming the arguments are the same type
Expr::BinaryExpr(BinaryExpr { left, op: _, right }) => {
// use adjusted_schema and not schema. Those expressions
may contain
// columns referring to a lambda parameter, which Field
would only be
// available in adjusted_schema and not in schema
rewrite_placeholder(left.as_mut(), right.as_ref(),
adjusted_schema)?;
rewrite_placeholder(right.as_mut(), left.as_ref(),
adjusted_schema)?;
}
....
```
In order to add the lambda parameters to schema, we need to take into
account DFSchema properties:
"Unqualified fields must be unique not only amongst themselves, but also
must have a distinct name from any qualified field names"
Since lambdas parameters are always unqualified, they may conflict with
columns of the outer schema, even though those being qualified. To fix this
conflict, we can either:
1: Replace the existing column with the lambda parameter, in the same index
of the vec of fields of the schema, in order to not change the index of columns
to the right of it. That's the current approach in this PR
<details><summary>Expr tree traversal with adjusted schema, replacing
conflicts</summary>
```sql
+------------------------------------------------------------------+
| array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) |
| |
| a = Int32 |
| b = List(List(Int32)) |
| c = Int32 |
+------------------------------------------------------------------+
|
|
v
+------------------------------------------------------------------+
| (b, i) -> array_transform(b, b -> b + c + i) |
| |
| a = Int32 |
| b = List(Int32) ! replaced ! |
| c = Int32 |
| i = Int32 |
+------------------------------------------------------------------+
|
|
v
+------------------------------------------------------------------+
| array_transform(b, b -> b + c + i) |
| |
| a = Int32 |
| b = List(Int32) ! replaced ! |
| c = Int32 |
| i = Int32 |
+------------------------------------------------------------------+
|
|
v
+------------------------------------------------------------------+
| b -> b + c + i |
| |
| a = Int32 |
| b = Int32 ! replaced ! |
| c = Int32 |
| i = Int32 |
+------------------------------------------------------------------+
```
</details>
<br>
<br>
2: Rename the shadowed column to an unique, non-conflicting name and add the
lambda parameter to the end of the vec of fields of the schema. This option
allows checking if a physical column refers to a lambda parameter by checking
if its index is greater or equal than the number of fields of the outer schema.
When this information is available, it eliminates the need to use the
with_lambdas_params variations of TreeNode methods. It's trivial to change the
PR to use this.
<details><summary>Expr tree traversal with adjusted schema, renaming
conflicts</summary>
```sql
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷
╷ ╷
╷ ╷
╷ a = Int32 ╷
╷ b = List(List(Int32)) ╷
╷ c = Int32 ╷
╷ ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ (b, i) -> array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ ╷
╷ a = Int32 ╷
╷ b_shadowed1 = List(List(Int32)) ╷
╷ c = Int32 ╷
╷ b = List(Int32) ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ ╷
╷ a = Int32 ╷
╷ b_shadowed1 = List(List(Int32)) ╷
╷ c = Int32 ╷
╷ b = List(Int32) ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ b -> b + c + i ╷
╷ ╷
╷ a = Int32 ╷
╷ b_shadowed1 = List(List(Int32)) ╷
╷ c = Int32 ╷
╷ b_shadowed2 = List(Int32) ╷
╷ b = Int32 ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
```
</details>
<br>
<br>
<br>
Lambdas usually are evaluated with a different number of rows than that of
the outer scope, as in the example, where array_transform is executed with one
row, and its lambda with two rows, one for each element of the array. The UDF
implementation is responsible for adjusting the captured columns with the
number of rows of its parameters with whatever logic makes sense to it. For
array_transform, its to copy the value of the captured column for each element
of the arrays:
```
copied once a [1]------------------> a 1
copied 2 times b [2, 3] --------------> b 2
\
not copied c [] ------------> b 3
```
This adjustment is costly, so it's necessary to provide a way to the
implementation to avoid adjusting uncaptured columns.
It's intuitive to just remove the uncaptured columns, but note in the
diagram and in the query below that it can change the index of captured
columns. The "c" column has index 2 in the outer scope but ends up with index 1
in the others scopes
<details><summary>Expr tree traversal with a schema with uncaptured columns
removed</summary>
```sql
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷
╷ ╷
╷ ╷
╷ a@0 = Int32 ╷
╷ b@1 = List(List(Int32)) ╷
╷ c@2 = Int32 ╷
╷ ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ (b, i) -> array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ ╷
╷ b@0 = List(Int32) ╷
╷ c@1 = Int32 ╷
╷ i@2 = Int32 ╷
╷ ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ ╷
╷ b@0 = List(Int32) ╷
╷ c@1 = Int32 ╷
╷ i@2 = Int32 ╷
╷ ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ b -> b + c + i ╷
╷ ╷
╷ ╷
╷ b@0 = Int32 ╷
╷ c@1 = Int32 ╷
╷ i@2 = Int32 ╷
╷ ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
```
</details>
```sql
select a@0, b@1, c@2, array_transform(b@0, (b@0, i@2) ->
array_transform(b@0, b@0 -> b@0 + c@1 + i@2)) from t;
```
# Option 1a: Nullify uncaptured columns
To keep the indices stable, this PR won't remove uncaptured columns, as
such, they are still present in the adjusted schema with their original type
during tree traversals with the new _with_schema methods. However, to avoid the
costly adjustment, when they are passed to the UDF in invoke_with_args, they
are substituted with columns with the Null datatype.
<details><summary>Expr execution/evaluation RecordBatch schema with
uncaptured columns substituted with Null columns</summary>
```sql
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷
╷ ╷
╷ ╷
╷ a = Int32 ╷
╷ b = List(List(Int32)) ╷
╷ c = Int32 ╷
╷ ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ (b, i) -> array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ ╷
╷ a = Null ! nullified ! ╷
╷ b = List(Int32) ╷
╷ c = Int32 ╷
╷ i = Int32 ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ ╷
╷ a = Null ! nullified ! ╷
╷ b = List(Int32) ╷
╷ c = Int32 ╷
╷ i = Int32 ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ b -> b + c + i ╷
╷ ╷
╷ ╷
╷ a = Null ! nullified ! ╷
╷ b = Int32 ╷
╷ c = Int32 ╷
╷ i = Int32 ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
```
</details>
# Option 1b TreeNode *_with_indices_mapping
To avoid keeping uncaptured columns in the schema and substituting them with
null in the batch, is possible to add another set of TreeNode-like methods on
physical expressions that calls the visiting/transforming function with a
second parameter of type HashMap<usize, usize> mapping the indices of the
current scope with the ones from the outermost scope. This requires that before
calling the visiting/transforming function for a physical lambda expression,
all its subtree be visited to collect all the captured columns to build the
indices mapping. Inner lambdas require the process again and can't reuse the
work of the outer lambda. This may be costly for lambdas with complex
expressions and/or highly nested.
```rust
impl PhysicalExprExt for Arc<dyn PhysicalExpr> {
pub fn transform_with_indices_mapping<
F: FnMut(Self, &HashMap<usize, usize>) -> Result<Transformed<Self>>,
>(
self,
mut f: F,
) -> Result<Transformed<Self>> {}
}
```
<details><summary>Expr tree traversal with indices_mapping: "c" has index 2
in the root scope but 1 in the others</summary>
```sql
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷
╷ ╷
╷ ╷
╷ indices_mapping = {} ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ (b, i) -> array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ ╷
╷ indices_mapping = { 1 => 2 } ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ ╷
╷ indices_mapping = { 1 => 2 } ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ b -> b + c + i ╷
╷ ╷
╷ ╷
╷ indices_mapping = { 1 => 2 } ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
```
</details>
The code on minimize_join_filter would change to:
```rust
fn minimize_join_filter(expr: Arc<dyn PhysicalExpr>, ...) -> JoinFilter {
let mut used_columns = HashSet::new();
expr.apply_with_indices_mapping(|expr, indices_mapping| {
if let Some(col) = expr.as_any().downcast_ref::<Column>() {
// this column may be child of a lambda, where this indice would
refer to the lambda
// scoped schema, which won't include uncaptured columns from
the plan input,
// and therefore may differ from the indices of the schema of
the input plan
// In such cases, indices_mapping contain the mapping to the
index of the input plan
// if a mapping is not found, it should be a column referring to
a lambda parameter
let scoped_index = col.index();
if let Some(plan_index) = indices_mapping.get(scoped_index) {
used_columns.insert(plan_index);
}
}
Ok(TreeNodeRecursion::Continue)
})
...
}
```
## Option 2. Create a schema with all parameters from all lambdas for tree
traversals
Use a secondary schema containing all parameters from all lambdas. For that,
expressions must be transformed, normalizing all lambda parameters and its
references, with a unique qualifier per lambda, so they can coexist without
conflicts in this schema. A qualifier field would be added to the lambda expr
```rust
pub struct Lambda {
pub qualifier: Option<String>,
pub params: Vec<String>,
pub body: Box<Expr>,
}
```
Schema of the example:
```
t.a: Int32
t.b: List(List(Int32))
lambda1.b: List(Int32)
lambda1.i: UInt32
lambda2.b: Int32
```
From my understanding of this
[video](https://www.youtube.com/watch?v=tcFsgFXV0sM&t=1076s), this is similar
to what DuckDB does on its binder, although with differences in the evaluation
part. I didn't find any other resource for other open source engines with
lambda support, like Clickhouse and Spark.
This works well when dealing with plans nodes, where, during plan creation
time or schema recomputation, we can normalize its lambdas, create the extended
schema and save it as plan field, exposing it with a method like
"lambda_extended_schema", although with an added cost to plan creation/schema
recomputation. The lambda normalization actually requires two passes, a first
to collect any existing lambda qualifier to avoid reusing them in the last,
normalizing pass.
How code would look like:
```rust
//from
expr.transform_with_schema(plan.schema(), |node, adjusted_schema| ...)
//to
let schema = plan.lambda_extended_schema();
expr.transform(|node| ...)
```
Another example:
```rust
impl LogicalPlan {
pub fn replace_params_with_values(
self,
param_values: &ParamValues,
) -> Result<LogicalPlan> {
self.transform_up_with_subqueries(|plan| {
// use plan.lambda_extended_schema() containing lambdas
parameters
// instead of plan.schema() which wont
let lambda_extended_schema =
Arc::clone(plan.lambda_extended_schema());
let name_preserver = NamePreserver::new(&plan);
plan.map_expressions(|e| {
// if this expression is child of lambda and contain
columns referring it's parameters
// the lambda_extended_schema already contain them
let (e, has_placeholder) =
e.infer_placeholder_types(&lambda_extended_schema)?;
....
```
However, when working with functions/methods that deal directly with
expressions, detached from a plan, the expression lambdas may be unnormalized,
and the extended schema is unavailable. There's a few public methods/functions
like that, like infer_placeholder_types for example:
```rust
impl Expr {
pub fn infer_placeholder_types(self, schema: &DFSchema) -> Result<(Expr,
bool)> {
let mut has_placeholder = false;
self.transform(|mut expr| ...)
...
}
}
```
It could either:
1: Require to be only called with normalized expressions, and that the
schema argument be the extended schema, or return an error otherwise, which is
restrictive and put strain on users
2: Allow to be called with unnormalized expressions, visit the whole expr
tree collecting the existing lambdas qualifiers to avoid to avoid duplicate
qualifiers in the next step, perform a first transformation to guarantee that
the expression lambdas are normalized, create the extended schema, for only
then perform the second transformation to infer the placeholder types using the
extended schema. While it can document that the returned expression is
normalized, it's still a regular Expr which doesn't encode that property in its
type. Also, without changing the method signature, it wouldn't even be possible
to return the extended schema to allow it to be used again in other places
without recomputation. This is costly and won't allow reuse of its costly work
<br>
<br>
Normalized example:
```sql
select t.a, t.b, array_transform(t.b, (lambda1.b, lambda1.i) ->
array_transform(lambda1.b, lambda2.b -> lambda2.b + t.a + lambda1.i)) from t;
```
Just like with the first option, this also sets uncaptured columns to Null,
as well as unavailable/out-of-scope lambdas parameters.
<details><summary>Expr tree batch evaluation with a single extended
schema</summary>
```sql
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, (b, i) -> array_transform(b, b -> b + c + i)) ╷
╷ ╷
╷ t.a = Int32 ╷
╷ t.b = List(List(Int32)) ╷
╷ t.c = Int32 ╷
╷ lambda1.b = Null ╷
╷ lambda1.i = Null ╷
╷ lambda2.b = Null ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ (b, i) -> array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ t.a = Null ╷
╷ t.b = Null ╷
╷ t.c = Int32 ╷
╷ lambda1.b = List(Int32) ╷
╷ lambda1.i = Int32 ╷
╷ lambda2.b = Null ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ array_transform(b, b -> b + c + i) ╷
╷ ╷
╷ t.a = Null ╷
╷ t.b = Null ╷
╷ t.c = Int32 ╷
╷ lambda1.b = List(Int32) ╷
╷ lambda1.i = Int32 ╷
╷ lambda2.b = Null ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
│
▼
┌╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┐
╷ b -> b + c + i ╷
╷ ╷
╷ t.a = Null ╷
╷ t.b = Null ╷
╷ t.c = Int32 ╷
╷ lambda1.b = Null ╷
╷ lambda1.i = Int32 ╷
╷ lambda2.b = Int32 ╷
└╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶╶┘
```
</details>
With this option, indices are always stable across the whole tree
This option allows checking if a physical column refers to a lambda
parameter by checking if its index is greater or equal than the number of
fields of the outer schema. When this information is available, it eliminates
the need to use the with_lambdas_params variations of TreeNode methods.
### Comparison between options:
|
*
| per-lambda schema with uncaptured columns set to null
| per-lambda schema with
indices_mapping
| single extended schema
|
|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------:|------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------|
| New set of TreeNode methods
| Yes, 1, _with_schema for both logical and physical expressions
| Yes, 2, _with_schema for both
logical and physical expressions, and _with_indices_mapping for physical
expressions | No
|
| Tree traversal added cost
| Only when encountering a lambda
| Only when encountering a
lambda
| Zero
|
| Plan creation/ recompute schema added cost
| Zero
| Zero
| Always, regardless of existence of any lambda
|
| Code change, internal
| New set of TreeNode methods and using them instead of the current ones
when applicable | 2 new set of TreeNode methods
and using them instead of the current ones when applicable
| Untried, unpredictable
|
| Code change, downstream
| if lambda support is desired, use the new TreeNode methods instead of
the current ones when applicable otherwise none | if lambda support is desired,
use the new TreeNode methods instead of the current ones when applicable
otherwise none | Variable, medium when closely associated with a Plan, just
call plan. lambda_extended_schema() Unpredictable when plan is unavailable or
doesn't exist |
| Change uncaptured columns DataType to Null
| Yes
| No
| Yes
|
| Presence of unneeded Null columns in the schema during planning and
optimizing and in the RecordBatch during execution as a padding/filler to keep
indices stable | Yes
| No
| Yes
|
| Stable column indices across the whole expr tree
| Yes
| No
| Yes
|
| Make _with_lambdas_params unnecessary for physical expressions if
Expr::Column is used
| No
| Yes
| No
|
# Splitting this into smaller PRs
If this PR is decided to move forward, it will likely be with smaller PRs.
In that case, I already planned a division shown below. It's necessary to
analyze the graph, as it doesn't help with the discussion of this text, and
it's included here just to show a reasonable granularity of smaller PRs I could
find in case it helps decide whether to move this forward or not.
Each rectangular node is a PR. Asymmetrical nodes are a collection of
smaller PRs which share the same dependencies and aren't a dependency of any
other PR. Green ones can be opened immediately. Gray ones contain unmet
dependencies. Merged PRs will be colored with blue.
Left-to-Right [full-screen
link](https://mermaid.live/view#pako:eNqNWG2P0zgQ_itWpJO408L1fSEfkLph0YFSLkfLfbh0ZXkTtw04drAddgviv984TtK8tWwkaDKZV8_MM5P94UQipo7r7CXJDsj_uOUIrogRpd7QHbpnOUW7hDHX3HVe7iWl3L4tbrfcMqj83qrbOut__K2DQvhBL2f-396d5TAX0ILiRUCkomgx773-VLz-xLOCYfzqxEB5bG_s_74f-mKfRISh28dMuq5P0vuYoPFoZGRct_LPMqPnz18jb30bwj80H53UBmHACEes1MWsloRrgbLDURVECgbQpOHspzdvx-HWuX3U4BVaAw-RQAvfpRm7Q9PCha1zYp6ENU9lQUUHmhJ0oCyjUlVuW4nlKvwg4JRzHulEcPCASEmOWEvC1U7IFE2umyGs3_qlgbelCApK383ZVCbJA5EUvLOSpaw9GaPiFNo50nJVif32m_cOUukJlqfcdROFrQ0MeSMp1VSixcnBXonYjP2BHxJ9wPYkts6pDvzNh3CABS2DdyiGIpwPpLiUuwlT8oWiIenWKTTPrxQtQoTf23VhHK0TyGaySyCWVnZK9uXrpsvmytWw4YRDBSVKJ3yPTOu1pTbHjCJPUBmZvI3HRe20ODhknLDkO8WRYGFDs8KExxjCSvZ5oo8YiNEXqL9xT8WaKgXq15po6rqRpPCLq_rGRX3PekI2loTvqMQZIxE9CBbDvTYe920ss4wdqwL8SB9koqkp7B6jyHRqosmk-EwLbuW60gpYXyb9AL6yjfhImetqeTSSEUSE91LkGb4_4pxzqvSQMfWVIUmVYN-K44NyVWg-ECqBZFNkQgM2mwwc01R0ddZt3cGjwA-rnkMlFBVVNB697BdrryGa7XqhMQJojAusdYNYrOo3SHCubC8phfIN6kytSJaZOp72U9TU0eidST8nOX-AuHFElMYJh_N
OATYSZcq_z7xURx4ZsxQ82yVQ8JJCRVIemeo6m0kvV1qkHphQTceWMcl00dHDWS1PqYSRS8fSgpPxYnR2UtVpjsB0Lhvt761Dz9KAKcuE1DU610Dj_QWDxgf0qMTrkVGYfDZ7HM9_b7m_KkRWVO5pLWMchyGyz1PKtSpVoIXB6GeTx-m0rWG58jZhd-REHU8n3YhPw6gctiWhRNbquZzFZktoPgenYXkC4j7tpkMKalIDwZc9yk3Llt8TMvpaLPXMO7052WyPzvZTrbqKe5CyrAZ9OVe9jSV46w4B8t-hrFqUblx-MDHEc7PWFq2yY1p1Zq4fNLQMzNG2cKsB7jqKwkuCw2Pc2G6UoLl6UGWGAwYoYNCU94xiKEwD6sOIbhwgcVyivkENFcJDDg1w9yfhx-YLNO5rKIxpUU-NAWyyr8wsK8bWgJICsEoNJ-CKzcyyIqNLIoCOMX2kg5Lzy3sC-tUigX-1SAyFI2mxCJwzUKMFhrE8lPrBcBkAchGTwvQbYTkxgwZCVjnTGO5omoFP90RHhyEFDZ-UOR0OC-iZtSMxeweURZarA47FA8fWOqxEgmsCk24gZrtZr_P7rzmVx414L4Bv1te_MSW5ZAlR5fIzOPuq-bQ-SBrHNH6v6mVJWvQ_t2t0caZs9adsEufbPqjbPmi0_ROUdNu_ozJ8ioqnAMGNuXpAYKf70GCf9vcSIiFzOoCcm5R7prbNBtDnNGghdjtFten7osv6THkWk2pPHaiycqnFQsKqbDalgRoozoLTh2qVGEYWZtRcAB9Y4mGp3_PClUuM8N1OoJWGTmt2NgDzMZKVR2YT1NmrzQ5QM1zWMySuwf0Uf4ZewrlOAMIHVeTcHGEGrQKea4qqr4QSHs8AYdVjZeD_AqbQoegnl9rNuXL2Mokd-ODI6ZWTUsBO8-j8MExbR0Pq6NZx4TYm8oup0J8gkxH-nxBpJQYfKPuD4-4IU_Bka-dNQqBbTyxgjkrPDCbHnRca
HPeH8-i409H1i9F8Mno1nc4ms8liPL1yjsAzfTFfTOaj8XQ8G79azK9_XjnfC5ujFy-vQQOclhZyZf_KU_yx5-f_S_hr8A)
<details>
```mermaid
graph LR
classDef blue fill:blue
classDef green fill:green
subgraph "SQL" [SQL 84LOC]
SQLP[SQL Parse 65LOC]
SQLU[SQL Unparse 19LOC]
end
LL[Logical Expr::Lambda 100LOC]:::green
LL --> CSE[CSE 50LOC]
P[Plan logical lambda into physical expr 25LOC]
UDF1["Extend ScalarUDF[Impl] 300LOC"]
UDF2[ScalarUDF lambda schema helpers 100LOC]
AM[Non functional array_transform 270LOC]
PSFL[ScalarFunction PhysicalExpr lambda aware 30LOC]
PL --> PSFL
UDF1 --> PSFL
UDF1 --> AM
%%CILP[Column::is_lambda_parameter 6LOC]
subgraph "Expr::*_with_schema"
LTN[Expr::*_with_schema API def 50LOC]:::green
LTNB[make Expr::*_with_schema lambda aware 70LOC]
LTN --> LTNES[Expr Simplifier 100LOC]
LTNA>"
use Expr::*_with_schema in existing code
Type Coercion 110LOC
normalize_col[with_schemas_and_ambiguity_check] 31LOC
SessionState::create_physical_expr 4LOC
Expr::infer_placeholder_type 1LOC
ApplyFunctionRewrites 10LOC
optmize_projections::rewrite_expr 2LOC
SqlToRel::try_process_group_by_unnest 10LOC
sql resolve_columns 5LOC
Example type_coercion_demo 10LOC
"]
end
PL[Physical Lambda Expr 108LOC]:::green
subgraph "PhysicalExpr::*_with_schema"
PTN[PhysicalExpr::*_with_schema API def 25LOC]:::green
PTNA>"
use PhysicalExpr::*_with_schema in ProjectionMapping 32LOC
PhysicalExprSimplifier 20LOC
unwrap_cast_in_comparison 10LOC
AsyncMapper::find_references 1LOC
Example CustomCastsPhysicalExprAdapter 10LOC
"]
PTNB[make PhysicalExpr::*_with_schema lambda aware 160LOC]
end
subgraph capture
CS[Capture support 30LOC]
LCH["List capture helpers 60LOC(4x15)"]
MCH["Merge capture with arguments helper 66LOC(2x33)"]
AMCT[array_transform capture support 20LOC]
end
PSFL --> CS
PTN --> CS
LL --> SQL
LL --> P
UDF2 --> LTNES
UDF2 --> LTNB
UDF2 --> PTNB
LTN --> LTNA
LTN --> LTNB
LL --> LTNB
LTN --> UDF2
LL --> UDF1 --> UDF2
UDF2 --> P
PL --> P
PL --> PTNB
PTN --> PTNB
PTN --> PTNA
AM --> AMCT
CS --> AMCT
LCH --> AMCT
MCH --> AMCT
LL --> LTNLP2
subgraph "Expr::*_with_lambdas_params"
LTNLP --> LTNLP2[make Expr::*_with_lambdas_params lambda aware]
LTNLP[Expr::*_with_lambdas_params API def 50LOC]:::green -->
AAAAA>"
expr_applicable_for_cols 5LOC
Expr::add_column_refs[_counts]/any_column_refs 15LOC
expr_to_columns 10LOC
columnize_expr 15LOC
find_columns_referenced_by_expr 10LOC
find_column_indexes_referenced_by_expr 5LOC
normalize_col 10LOC
normalize_col_with_schemas_and_ambiguity_check 15LOC
replace_col 10LOC
transform_up_with_lambdas_params 10LOC
filter_exprs_evaluation_result_on_empty_batch 10LOC
replace_cols_by_name 10LOC
optimizer::push_down_filter::contain 15LOC
ScalarSubqueryToJoin 40LOC
TableAliasRewriter 20LOC
Example ShreddedJsonRewriter 30LOC
"]
end
PL --> PTNLP2
subgraph "PhysicalExpr::*_with_lambdas_params"
PTNLP --> PTNLP2[make PhysicalExpr::*_with_lambdas_params lambda
aware]
PTNLP[PhysicalExpr::*_with_lambdas_params API def 50LOC]:::green -->
BBBBB>"
CustomPhysicalExprAdapter 3LOC
ParquetPushdownChecker 13LOC
add_offset_to_expr 3LOC
update_expr 10LOC
project_ordering 20LOC
with_new_schema 10LOC
collect_columns 10LOC
reassign_expr_columns 10LOC
DefaultPhysicalExprAdapter 40LOC
projection pushdown 50LOC
sort pushdown 40LOC
projection 50LOC
stream_join_utils 40LOC
pruning predicate rewrite_column_expr 5LOC
Example DefaultValuePhysicalExprAdapter 20LOC
"]
end
```
</details>
<br>
<br>
Top-to-Bottom [full-screen
link](https://mermaid.live/view#pako:eNqNWG2P0zgQ_itWpJO408K12baUfEDqhkUHarkcLffh0pXlTdw24NjBdtgtiP9-4zhJ89aykaDJZF49M89M9ocTiZg6nrOXJDugzc2WI7giRpR6Q3fonuUU7RLGPHPXebmXlHL7trjdcsug8nurbuus_1luHRTCD5pPln_7d5bDXEALihcBkYqi2bT3-lPx-hPPCobxqxMD5bG9sf8vl-FS7JOIMHT7mEnPW5L0PiZoPBoZGc-r_LPM6Pnz18hf34bwD01HJ7VBGDDCESt1Masl4Vqg7HBUBZGCAeQ2nP305u043Dq3jxq8QmvgIRJo4bs0Y3founBh65yY3bDmqSyo6EBTgg6UZVSqym0rsViFHwSccs4jnQgOHhApyRFrSbjaCZki92UzhPXbZWngbSmCgtJ3czaVSfJAJAXvrGQpa0_GqDiFdo60WFViv_3mv4NU-oLlKfe8RGFrA0PeSEo1lWh2crBXIjZjf-CHRB-wPYmtc6qD5eZDOMCCFsE7FEMRTgdSXMrdhCn5QtGQdOsUmudXihYhwu_tujCO1glkM9klEEsrOyX74nXTZXPlathwwqGCEqUTvkem9dpSm2NGkS-ojEzexuOidlocHDJOWPKd4kiwsKFZYcJjDGEl-zzRRwzE6AvU37inYk2VAvVrTTT1vEhS-MVVfeOivic9IRtLwndU4oyRiB4Ei-FeG4_7NhZZxo5VAX6kDzLR1BR2j1FkOjXRZFJ8pgW38jxpBawvbj-Ar2wjPlLmeVoejWQEEeG9FHmG748455wqPWRMfWVIUiXYt-L4oFwVmg6ESiDZFJnQgM0mA8c0FV2ddVt38ChYhlXPoRKKiioaj-b9Yu01RLNdLzRGAI1xgbVuEItV_QYJzpXtJaVQvkGdqRXJMlPH1_0UNXU0esft5yTnDxA3jojSOOF
w3inARqJM-feZF-rII2OWgme7BApeUqhIyiNTXWcz6edKi9QHE6rp2CImmS46ejir5SmVMHLpWFpwMp6Nzk6qOs0RmM5lo_39dehbGjBlmZC6RucaaPy_YNAsAT0q8XpkFCafTR7H099b7q8KkRWVe1rLGMdhiOzzlHKtShVoZjD6mft4fd3WsFj5m7A7cqKOp2434tMwKodtSSiRtXouZ7HZEprPwWlYnoC4T7vpkIKa1EDwRY9y07K17AkZfS2Weuad3pxstkdn-6lWXcU9SFlUg76cq_7GEvx1hwD571BWLUo3rmXgGuK5WWuLVtkxrTozdxk0tAzM0bZwqwHuOorCS4LDY9zYbpSguXpQZYYDBihg0JT3jGIoTAPqw4huHCBxXKK-QQ0VwkMODXD3J-HH5gs07msojGlRT40BbLKvzCwrxtaAkgKwSg0n4IrNzLIio0sigI4xfaSDktPLewL61SKBf7VIDIUjabEInDNQowWGsTyU-sFwGQByEZPC9BthOTGDBkJWOdMY7miagU_3REeHIQUNn5Q5HQ4L6Jm1IzF7B5RFlqsDjsUDx9Y6rESCawKTbiBmu1mv8_uvOZXHjXgvgG_S178xJblgCVHl8jM4-6r5tD5IGsc0fq_qZUla9D-3a3Rxpmz1p2wS59s-qNs-aLT9E5R027-jMnyKiqcAwY25ekBgp_vQYL_u7yVEQuZ0ADk3KfdNbZsNoM9p0ELsdopq0_dFl_WZ8iwm1Z46UGXlUouFhFXZbEoDNVCcBacP1SoxjCzMqLkAPrDEw1K_54Urlxjhu51AKw2d1uRsAOZjJCuPzCaos1ebHaBmuKxnSFyD-yn-DL2Ec50AhA-qyLk5wgxaBTzXFFVfCSU8ngHCqsfKwP8FTKFD0buX2s25cvYyiR344MjplZNSwE7z6PwwTFtHQ-ro1vHgNibyi6nQnyCTEf6fEGklBh8o-4Pj7QhT8GRr501CoFtPLGCOSt8MJseb
Fhoc74fzCA_TF6OpOxqN5pPpZDx151fO0fHG7vzFdOZOJ_PJ7Ho2mb1yf1453wuboxfzl6ABTksLubJ_5Sn-2PPzf8Ata_o)
<details>
```mermaid
graph TB
classDef blue fill:blue
classDef green fill:green
subgraph "SQL" [SQL 84LOC]
SQLP[SQL Parse 65LOC]
SQLU[SQL Unparse 19LOC]
end
LL[Logical Expr::Lambda 100LOC]:::green
LL --> CSE[CSE 50LOC]
P[Plan logical lambda into physical expr 25LOC]
UDF1["Extend ScalarUDF[Impl] 300LOC"]
UDF2[ScalarUDF lambda schema helpers 100LOC]
AM[Non functional array_transform 270LOC]
PSFL[ScalarFunction PhysicalExpr lambda aware 30LOC]
PL --> PSFL
UDF1 --> PSFL
UDF1 --> AM
%%CILP[Column::is_lambda_parameter 6LOC]
subgraph "Expr::*_with_schema"
LTN[Expr::*_with_schema API def 50LOC]:::green
LTNB[make Expr::*_with_schema lambda aware 70LOC]
LTN --> LTNES[Expr Simplifier 100LOC]
LTNA>"
use Expr::*_with_schema in existing code
Type Coercion 110LOC
normalize_col[with_schemas_and_ambiguity_check] 31LOC
SessionState::create_physical_expr 4LOC
Expr::infer_placeholder_type 1LOC
ApplyFunctionRewrites 10LOC
optmize_projections::rewrite_expr 2LOC
SqlToRel::try_process_group_by_unnest 10LOC
sql resolve_columns 5LOC
Example type_coercion_demo 10LOC
"]
end
PL[Physical Lambda Expr 108LOC]:::green
subgraph "PhysicalExpr::*_with_schema"
PTN[PhysicalExpr::*_with_schema API def 25LOC]:::green
PTNA>"
use PhysicalExpr::*_with_schema in ProjectionMapping 32LOC
PhysicalExprSimplifier 20LOC
unwrap_cast_in_comparison 10LOC
AsyncMapper::find_references 1LOC
Example CustomCastsPhysicalExprAdapter 10LOC
"]
PTNB[make PhysicalExpr::*_with_schema lambda aware 160LOC]
end
subgraph capture
CS[Capture support 30LOC]
LCH["List capture helpers 60LOC(4x15)"]
MCH["Merge capture with arguments helper 66LOC(2x33)"]
AMCT[array_transform capture support 20LOC]
end
PSFL --> CS
PTN --> CS
LL --> SQL
LL --> P
UDF2 --> LTNES
UDF2 --> LTNB
UDF2 --> PTNB
LTN --> LTNA
LTN --> LTNB
LL --> LTNB
LTN --> UDF2
LL --> UDF1 --> UDF2
UDF2 --> P
PL --> P
PL --> PTNB
PTN --> PTNB
PTN --> PTNA
AM --> AMCT
CS --> AMCT
LCH --> AMCT
MCH --> AMCT
LL --> LTNLP2
subgraph "Expr::*_with_lambdas_params"
LTNLP --> LTNLP2[make Expr::*_with_lambdas_params lambda aware]
LTNLP[Expr::*_with_lambdas_params API def 50LOC]:::green -->
AAAAA>"
expr_applicable_for_cols 5LOC
Expr::add_column_refs[_counts]/any_column_refs 15LOC
expr_to_columns 10LOC
columnize_expr 15LOC
find_columns_referenced_by_expr 10LOC
find_column_indexes_referenced_by_expr 5LOC
normalize_col 10LOC
normalize_col_with_schemas_and_ambiguity_check 15LOC
replace_col 10LOC
transform_up_with_lambdas_params 10LOC
filter_exprs_evaluation_result_on_empty_batch 10LOC
replace_cols_by_name 10LOC
optimizer::push_down_filter::contain 15LOC
ScalarSubqueryToJoin 40LOC
TableAliasRewriter 20LOC
Example ShreddedJsonRewriter 30LOC
"]
end
PL --> PTNLP2
subgraph "PhysicalExpr::*_with_lambdas_params"
PTNLP --> PTNLP2[make PhysicalExpr::*_with_lambdas_params lambda
aware]
PTNLP[PhysicalExpr::*_with_lambdas_params API def 50LOC]:::green -->
BBBBB>"
CustomPhysicalExprAdapter 3LOC
ParquetPushdownChecker 13LOC
add_offset_to_expr 3LOC
update_expr 10LOC
project_ordering 20LOC
with_new_schema 10LOC
collect_columns 10LOC
reassign_expr_columns 10LOC
DefaultPhysicalExprAdapter 40LOC
projection pushdown 50LOC
sort pushdown 40LOC
projection 50LOC
stream_join_utils 40LOC
pruning predicate rewrite_column_expr 5LOC
Example DefaultValuePhysicalExprAdapter 20LOC
"]
end
```
</details>
<br>
<br>
Right-to-Left [full-screen
link](https://mermaid.live/view#pako:eNqNWG2PmzgQ_isWUqXeadsL5G3Lh0pZutW1SlquSe_DkZXlBSehBUNt09206n-_sQ2Et6SL1AaGefXMPDPsTyvMImq51p6T_IA-LbcMwRUmRIg3dIfuk4KiXZwkrrrrvNxzSpl5q2-3zDCI4t6o21rrf5ZbCwXwg64ny4_eneFQF9B8_cInXFA0m_Zef9avP7NcM9ivTgyURebG_L9cBstsH4ckQbePOXfdJUnvI4Ls0UjJuG7ln2FGL168Rt76NoB_aDo6qfUDPyEMJaWuxGiJmcxQfjgKTaRgADkNZz-_eWsHW-v2UYJXaA08hAMteJfmyR0aaxe21onZCWqeyoIIDzQl6ECTnHJRuW0kFqvgQwanXLBQxhkDDwjn5IglJ0zsMp4iZ94MYf12WRp4W4ogv_RdnU1lkjwQTsE7I1nKmpNRKk6hnSMtVpXYs2feO0illyVFylw3FtjYwJA3klJJOZqdHOyViMnYn_ghlgdsTmJrnepgufkQDLCghf8ORVCE04EUl3I3QUq-UjQk3TqF5vmVojpE-L1da-NoHUM2410MsbSyU7IvXjddVlchhg3HDCooFjJme6Rary21OeYUeRnlocqbbevaaXEwyDhJ4h8Uh1kSNDQLTFiEIax4X8TyiIEYfoX6s3sq1lQIUL-WRFLXDTmFX1zVN9b1PekJmVhitqMc5wkJ6SFLIriXyuO-jUWeJ8eqAD_RBx5Lqgq7x5jlMlXR5Dz7QjW3cF1uBIwvTj-Ab8km-0QT15X8qCRDiAjveVbk-P6IC8aokEPGxLcEcSqy5Ls-PihXgaYDoRJINkUqNGAzycARTbOuzrqtO3jkL4Oq51AJRbqK7NF1v1h7DdFs1wuN4UNjXGCtG8RgVb9B_HNle0kplK9fZ2pF8lzV8bifoqaORu84_ZwU7AHixiEREsc
MzjsF2IiFKv8-80IcWajMUvBsF0PBcwoVSVmoqutsJr1CyCz1wIRoOraISC51Rw9ntTylEkYuHUsLTuzZ6OykqtMcgumCN9rfWweeoQFTnmdc1uhcA433NwyaJaBHJV6PDG3y-eTRnv7Rcn-lRVaU72ktoxyHIbIvUsqkKFWgmcLo587jeNzWsFh5m6A7csKOp0434tMwKodtSSiRtXouZ7HaEprP_mlYnoC4T7vpkPya1EDwRY9y07K17AkpfS2Weuad3pxstkdn-6lWXcU9SFlUg76cq97GELx1hwD571BWLUo3rqXvKOK5WWuKVpgxLTozd-k3tAzM0bZwqwHuOoqCS4LDY1zZbpSgunpQpYYDBihIoCnvE4qhMBWoDyO6coBEUYn6CjVEAA8FNMDdX4Qdmy-Q3degjcmsnhoD2GReqVmmx9aAEg1YpYYTcEVqZhmR0SURQMeIPtJByenlPQH9bpHAv1skhsLhVC8C5wzUaIFhLA-lfjDcBABZxyQw_U6SgqhBAyGLIpEY7miag0_3RIaHIQUNn4Q6HQYL6Jm1I1Z7B5RFXogDjrIHho11WIkyJglMuoGYzWa9Lu6_FZQfN9n7DPgmff0bVZKLJCaiXH4GZ181n9YHTqOIRu9FvSxxg_7ndo0uzpSt_pRN4nzb-3Xb-422f4KSbvt3VAZPUfEUILhRVw8IzHQfGuzj_l5COGRO-pBzlXJP1bbaAPqcCi2y3U5Qqfped1mfqcgjUu2pA1VWLrU447Aqq01poAb0WTD6UK0Sw8iSKDUXwAeWeFjq90y7cokRvtsJtNLQaU3OBqA-RvLyyEyCOnu12gFqhst6hsQluJ_iL9BLuJAxQPigioKpI8yhVcBzSVH1lVDC4xkgrHqsDPxfwBQ6FL1zqd2sK2vP48iCD46CXlkpBexUj9ZPxbS1JKSObi0XbiPCv6oK_QUyOWH_ZVlaicEHyv5guTuSCHgytfMmJtCtJxYwR7mnBpPl
TsZaheX-tB4t15lPX85H9tieje3pK2fmTK-so-W-cGavXtrX89FcE8fXzuzXlfVDm7VfTp35xJmPJ_PpfDSb2fMrC05PZnxl_uqj__jz63_6m29E)
<details>
```mermaid
graph RL
classDef blue fill:blue
classDef green fill:green
subgraph "SQL" [SQL 84LOC]
SQLP[SQL Parse 65LOC]
SQLU[SQL Unparse 19LOC]
end
LL[Logical Expr::Lambda 100LOC]:::green
LL --> CSE[CSE 50LOC]
P[Plan logical lambda into physical expr 25LOC]
UDF1["Extend ScalarUDF[Impl] 300LOC"]
UDF2[ScalarUDF lambda schema helpers 100LOC]
AM[Non functional array_transform 270LOC]
PSFL[ScalarFunction PhysicalExpr lambda aware 30LOC]
PL --> PSFL
UDF1 --> PSFL
UDF1 --> AM
%%CILP[Column::is_lambda_parameter 6LOC]
subgraph "Expr::*_with_schema"
LTN[Expr::*_with_schema API def 50LOC]:::green
LTNB[make Expr::*_with_schema lambda aware 70LOC]
LTN --> LTNES[Expr Simplifier 100LOC]
LTNA>"
use Expr::*_with_schema in existing code
Type Coercion 110LOC
normalize_col[with_schemas_and_ambiguity_check] 31LOC
SessionState::create_physical_expr 4LOC
Expr::infer_placeholder_type 1LOC
ApplyFunctionRewrites 10LOC
optmize_projections::rewrite_expr 2LOC
SqlToRel::try_process_group_by_unnest 10LOC
sql resolve_columns 5LOC
Example type_coercion_demo 10LOC
"]
end
PL[Physical Lambda Expr 108LOC]:::green
subgraph "PhysicalExpr::*_with_schema"
PTN[PhysicalExpr::*_with_schema API def 25LOC]:::green
PTNA>"
use PhysicalExpr::*_with_schema in ProjectionMapping 32LOC
PhysicalExprSimplifier 20LOC
unwrap_cast_in_comparison 10LOC
AsyncMapper::find_references 1LOC
Example CustomCastsPhysicalExprAdapter 10LOC
"]
PTNB[make PhysicalExpr::*_with_schema lambda aware 160LOC]
end
subgraph capture
CS[Capture support 30LOC]
LCH["List capture helpers 60LOC(4x15)"]
MCH["Merge capture with arguments helper 66LOC(2x33)"]
AMCT[array_transform capture support 20LOC]
end
PSFL --> CS
PTN --> CS
LL --> SQL
LL --> P
UDF2 --> LTNES
UDF2 --> LTNB
UDF2 --> PTNB
LTN --> LTNA
LTN --> LTNB
LL --> LTNB
LTN --> UDF2
LL --> UDF1 --> UDF2
UDF2 --> P
PL --> P
PL --> PTNB
PTN --> PTNB
PTN --> PTNA
AM --> AMCT
CS --> AMCT
LCH --> AMCT
MCH --> AMCT
LL --> LTNLP2
subgraph "Expr::*_with_lambdas_params"
LTNLP --> LTNLP2[make Expr::*_with_lambdas_params lambda aware]
LTNLP[Expr::*_with_lambdas_params API def 50LOC]:::green -->
AAAAA>"
expr_applicable_for_cols 5LOC
Expr::add_column_refs[_counts]/any_column_refs 15LOC
expr_to_columns 10LOC
columnize_expr 15LOC
find_columns_referenced_by_expr 10LOC
find_column_indexes_referenced_by_expr 5LOC
normalize_col 10LOC
normalize_col_with_schemas_and_ambiguity_check 15LOC
replace_col 10LOC
transform_up_with_lambdas_params 10LOC
filter_exprs_evaluation_result_on_empty_batch 10LOC
replace_cols_by_name 10LOC
optimizer::push_down_filter::contain 15LOC
ScalarSubqueryToJoin 40LOC
TableAliasRewriter 20LOC
Example ShreddedJsonRewriter 30LOC
"]
end
PL --> PTNLP2
subgraph "PhysicalExpr::*_with_lambdas_params"
PTNLP --> PTNLP2[make PhysicalExpr::*_with_lambdas_params lambda
aware]
PTNLP[PhysicalExpr::*_with_lambdas_params API def 50LOC]:::green -->
BBBBB>"
CustomPhysicalExprAdapter 3LOC
ParquetPushdownChecker 13LOC
add_offset_to_expr 3LOC
update_expr 10LOC
project_ordering 20LOC
with_new_schema 10LOC
collect_columns 10LOC
reassign_expr_columns 10LOC
DefaultPhysicalExprAdapter 40LOC
projection pushdown 50LOC
sort pushdown 40LOC
projection 50LOC
stream_join_utils 40LOC
pruning predicate rewrite_column_expr 5LOC
Example DefaultValuePhysicalExprAdapter 20LOC
"]
end
```
</details>
<br>
<br>
Bottom-to-Top [full-screen
link](https://mermaid.live/view#pako:eNqNWFuP0zgU_itWpJXY1cC2naaUPCB1wqAFtWyWln3YdGR5ErcNJHawHWYK4r_vcZykubhlIkET59wv3zmZH07EY-p4zl6Q_IBuNluG4IpSIuUbukP3aUHRLklTT9_1Xu4Fpcy8LW-3zBDI4t6I2zrrf5ZbB4Xwg-bT5d_-naHQF5wF5YuACEnRzB28_lS-_sTykmD86kRAWWxuzP_LZbjk-yQiKbp9zIXnLUl2HxM0Ho00j-fV9hli9Pz5a-Svb0P4h9zRSWwQBilhKK1kpUZKwhRH-eEoy0MKCtCkZeynN2_H4da5fVRgFVoDDRFwFr7L8vQOXZcmbJ0T8SRsaGoNMjrQjKADTXMqZG224Viswg8colywSCWcgQVECHLEShAmd1xkaPKy7cL67bJS8LZiQUFlu45NrZI8EEHBOsNZ8ZrIaBEn184dLVY122-_-e8glT5Pi4x5XiKx0YEhbySjigo0Oxk4KBGTsT_wQ6IO2ERi65zqYLn5EFpI0CJ4h2IoQteS4orvJszIF4ps3J0otONXsZYuwu_tulSO1glkM9kl4EsnOxX54nXbZH0V0q44YVBBiVQJ2yPdel2uzTGnyOdURDpv43FZOx0KBhknafKd4oinYUuyxITFGNxK9kWijhgOoy9Qf-OBiDWVEsSvFVHU8yJB4RfX9Y3L-p4OmIwvCdtRgfOURPTA0xjulbZ4qGOR5-mxLsCP9EEkiurCHhDyXGXam1zwz7Sklp4nDIOxZTJ04Gu64R9p6nlKHDVnBB7hveBFju-PuGCMSmVTJr-mSFDJ029l-KBcJXItrhJINkXaNSAzycAxzXhfZtPWPTwKlmHdc6iCorKKxqP5sFgHDdFu1wuNEUBjXCBtGsRg1bBBgnNle0kolG_QZGpF8lzX8fUwRW0Zrd6ZDHNSsAfwG0dEKpw
wiHcGsJFIXf5D4oU8skirpWDZLoGCFxQqkrJIV9fZTPqFVDzzQYVsG7aISa7KjrZntYpSBSOXwtKBk_FsdHZSNWmOQHUhWu3vr0PfnAFRnnOhGnRugMb_CwbNEtCjZm9GRqny2fRx7P7eMX9Vsqyo2NOGRxsOQ2RfZJQpWYlAM43RzyaP19ddCYuVvwn7IyfqWTrpe3waRtWwrQ4qZK2fq1mst4T2c3AalicgHp7d9I6C5qiF4IvByU1H13LApOV1SJqZd3pz0tkdnd2nRnTtt_VkUQ_6aq761RLmr3sHkP_eyapz0vdrGUz04blZa4pWmjEtezN3GbSkWOZol7nTAHc9QeElRvsY17pbJaivAVTp4YABClJoyvuUYihMDep2RNcGkDiuUF-jhgzhoYAGuPuTsGP7BRoPJZTKFG-mhgWbzCs9y8qxZRFSAlYl4QRcsZ5ZhmV0iQXQMaaP1MrpXt4T0K8WCfyrRcLmjqDlInBOQYMWGMayLfVWd1MA5NIniek3khZEDxpwWRapwnBHsxxsuicqOtgEtGySOjoMFtAza0ei9w4oi7yQBxzzB4aNdliJOFMEJp3FZ7NZr4v7rwUVxw1_z4FuOpS_0SW5SBMiq-XHOvvq-bQ-CBrHNH4vm2VJGPQ_t2v0caZq9adsEufbPmjaPmi1_ROE9Nu_JzJ8ioinAMGNvgZAYKa7bbBfD_cSIiBzKoCc65T7urb1BjCk1GjBdztJle77ssuGREUek3pPtVRZtdRiLmBV1puSpQbKWDD6UK8SdmRJtZgL4ANLPCz1e1aacokQvtsJtJItWtOzDuiPkbwKmUlQb6_WO0BDcFmOjV2B-Rn-DL2EC5UAhFtFFEyHMIdWAcsVRfVXQgWPZ4Cw7rHK8X8BU6jN-8mldnOunL1IYgc-OAp65WQUsFM_Oj800dZRkDq6dTy4jYn4oiv0J_DkhP3HeVazwQfK_uB4O5JKeDK18yYh0K0nElBHha8Hk-O5
o1Epw_F-OI_w6L4YuZPRaDSfutOxO5lfOUfHG0_mL9zZxJ3Op7Pr2XT2avLzyvleah29mL90rxyIl-JiZf7OU_655-f_jC5sWg)
<details>
```mermaid
graph BT
classDef blue fill:blue
classDef green fill:green
subgraph "SQL" [SQL 84LOC]
SQLP[SQL Parse 65LOC]
SQLU[SQL Unparse 19LOC]
end
LL[Logical Expr::Lambda 100LOC]:::green
LL --> CSE[CSE 50LOC]
P[Plan logical lambda into physical expr 25LOC]
UDF1["Extend ScalarUDF[Impl] 300LOC"]
UDF2[ScalarUDF lambda schema helpers 100LOC]
AM[Non functional array_transform 270LOC]
PSFL[ScalarFunction PhysicalExpr lambda aware 30LOC]
PL --> PSFL
UDF1 --> PSFL
UDF1 --> AM
%%CILP[Column::is_lambda_parameter 6LOC]
subgraph "Expr::*_with_schema"
LTN[Expr::*_with_schema API def 50LOC]:::green
LTNB[make Expr::*_with_schema lambda aware 70LOC]
LTN --> LTNES[Expr Simplifier 100LOC]
LTNA>"
use Expr::*_with_schema in existing code
Type Coercion 110LOC
normalize_col[with_schemas_and_ambiguity_check] 31LOC
SessionState::create_physical_expr 4LOC
Expr::infer_placeholder_type 1LOC
ApplyFunctionRewrites 10LOC
optmize_projections::rewrite_expr 2LOC
SqlToRel::try_process_group_by_unnest 10LOC
sql resolve_columns 5LOC
Example type_coercion_demo 10LOC
"]
end
PL[Physical Lambda Expr 108LOC]:::green
subgraph "PhysicalExpr::*_with_schema"
PTN[PhysicalExpr::*_with_schema API def 25LOC]:::green
PTNA>"
use PhysicalExpr::*_with_schema in ProjectionMapping 32LOC
PhysicalExprSimplifier 20LOC
unwrap_cast_in_comparison 10LOC
AsyncMapper::find_references 1LOC
Example CustomCastsPhysicalExprAdapter 10LOC
"]
PTNB[make PhysicalExpr::*_with_schema lambda aware 160LOC]
end
subgraph capture
CS[Capture support 30LOC]
LCH["List capture helpers 60LOC(4x15)"]
MCH["Merge capture with arguments helper 66LOC(2x33)"]
AMCT[array_transform capture support 20LOC]
end
PSFL --> CS
PTN --> CS
LL --> SQL
LL --> P
UDF2 --> LTNES
UDF2 --> LTNB
UDF2 --> PTNB
LTN --> LTNA
LTN --> LTNB
LL --> LTNB
LTN --> UDF2
LL --> UDF1 --> UDF2
UDF2 --> P
PL --> P
PL --> PTNB
PTN --> PTNB
PTN --> PTNA
AM --> AMCT
CS --> AMCT
LCH --> AMCT
MCH --> AMCT
LL --> LTNLP2
subgraph "Expr::*_with_lambdas_params"
LTNLP --> LTNLP2[make Expr::*_with_lambdas_params lambda aware]
LTNLP[Expr::*_with_lambdas_params API def 50LOC]:::green -->
AAAAA>"
expr_applicable_for_cols 5LOC
Expr::add_column_refs[_counts]/any_column_refs 15LOC
expr_to_columns 10LOC
columnize_expr 15LOC
find_columns_referenced_by_expr 10LOC
find_column_indexes_referenced_by_expr 5LOC
normalize_col 10LOC
normalize_col_with_schemas_and_ambiguity_check 15LOC
replace_col 10LOC
transform_up_with_lambdas_params 10LOC
filter_exprs_evaluation_result_on_empty_batch 10LOC
replace_cols_by_name 10LOC
optimizer::push_down_filter::contain 15LOC
ScalarSubqueryToJoin 40LOC
TableAliasRewriter 20LOC
Example ShreddedJsonRewriter 30LOC
"]
end
PL --> PTNLP2
subgraph "PhysicalExpr::*_with_lambdas_params"
PTNLP --> PTNLP2[make PhysicalExpr::*_with_lambdas_params lambda
aware]
PTNLP[PhysicalExpr::*_with_lambdas_params API def 50LOC]:::green -->
BBBBB>"
CustomPhysicalExprAdapter 3LOC
ParquetPushdownChecker 13LOC
add_offset_to_expr 3LOC
update_expr 10LOC
project_ordering 20LOC
with_new_schema 10LOC
collect_columns 10LOC
reassign_expr_columns 10LOC
DefaultPhysicalExprAdapter 40LOC
projection pushdown 50LOC
sort pushdown 40LOC
projection 50LOC
stream_join_utils 40LOC
pruning predicate rewrite_column_expr 5LOC
Example DefaultValuePhysicalExprAdapter 20LOC
"]
end
```
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]