drewrip opened a new issue, #22961:
URL: https://github.com/apache/datafusion/issues/22961
### Describe the bug
The logical optimizer (ie, `into_optimized_plan()`) creates logical plans
that can't be unparsed into correct SQL. This doesn't seem universal but does
apply to a lot of the queries I'm trying.
## TLDR
**Succeeds:**
1. parse SQL - ✅
2. build unoptimized plan - ✅
3. unparse SQL - ✅
4. execute against DuckDB - ✅
**Fails:**
1. parse SQL - ✅
2. build *optimized* plan - ✅
3. unparse SQL - ❌
4. execute against DuckDB - ❌
### To Reproduce
Here is a simple example that reproduces the behavior I'm describing:
```toml
datafusion = "54.0.0"
duckdb = { version = "1.10503.1", features = ["bundled"] }
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
```
```rust
use std::sync::Arc;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::catalog::{
CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider,
SchemaProvider,
};
use datafusion::datasource::empty::EmptyTable;
use datafusion::optimizer::OptimizerRule;
use datafusion::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
use datafusion::prelude::*;
use datafusion::sql::unparser::Unparser;
use datafusion::sql::unparser::dialect::DuckDBDialect;
use duckdb::Connection;
const QUERY: &str = r#"
SELECT * FROM
(
SELECT
item_id,
order_id,
product_id,
quantity,
unit_price,
quantity * unit_price AS line_total
FROM
"warehouse"."main"."order_items"
) oi
JOIN (
SELECT
order_id,
customer_id,
order_date,
lower(STATUS) AS STATUS,
lower(channel) AS channel,
coalesce(discount_pct, 0) AS discount_pct,
coalesce(shipping_cost, 0) AS shipping_cost,
STATUS IN ('completed', 'shipped') AS is_fulfilled
FROM
"warehouse"."main"."orders"
) o USING (order_id)
JOIN (
SELECT
p.product_id,
p.category_id,
p.sku,
p.name AS product_name,
p.price,
p.cost,
p.weight_kg,
p.is_active,
p.stock_qty,
round(p.price - p.cost, 2) AS gross_margin,
round((p.price - p.cost) / nullif(p.price, 0), 4) AS margin_pct,
c.name AS category_name
FROM
"warehouse"."main"."products" p
LEFT JOIN "warehouse"."main"."categories" c USING (category_id)
) p USING (product_id)
"#;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("registering tables");
let order_items_schema = Arc::new(Schema::new(vec![
Field::new("item_id", DataType::Int32, false),
Field::new("order_id", DataType::Int32, true),
Field::new("product_id", DataType::Int32, true),
Field::new("quantity", DataType::Int32, true),
Field::new("unit_price", DataType::Decimal128(10, 2), true),
]));
let orders_schema = Arc::new(Schema::new(vec![
Field::new("order_id", DataType::Int32, false),
Field::new("customer_id", DataType::Int32, true),
Field::new("order_date", DataType::Date32, true),
Field::new("status", DataType::Utf8, true),
Field::new("channel", DataType::Utf8, true),
Field::new("discount_pct", DataType::Decimal128(5, 2), true),
Field::new("shipping_cost", DataType::Decimal128(8, 2), true),
]));
let products_schema = Arc::new(Schema::new(vec![
Field::new("product_id", DataType::Int32, false),
Field::new("category_id", DataType::Int32, true),
Field::new("sku", DataType::Utf8, true),
Field::new("name", DataType::Utf8, true),
Field::new("price", DataType::Decimal128(10, 2), true),
Field::new("cost", DataType::Decimal128(10, 2), true),
Field::new("weight_kg", DataType::Decimal128(6, 3), true),
Field::new("is_active", DataType::Boolean, true),
Field::new("stock_qty", DataType::Int32, true),
]));
let categories_schema = Arc::new(Schema::new(vec![
Field::new("category_id", DataType::Int32, false),
Field::new("name", DataType::Utf8, true),
Field::new("parent_id", DataType::Int32, true),
Field::new("display_rank", DataType::Int32, true),
]));
let ctx = SessionContext::new();
// ctx.remove_optimizer_rule(CommonSubexprEliminate::new().name());
let schema_provider = Arc::new(MemorySchemaProvider::new());
schema_provider.register_table(
"order_items".to_string(),
Arc::new(EmptyTable::new(order_items_schema)),
)?;
schema_provider.register_table(
"orders".to_string(),
Arc::new(EmptyTable::new(orders_schema)),
)?;
schema_provider.register_table(
"products".to_string(),
Arc::new(EmptyTable::new(products_schema)),
)?;
schema_provider.register_table(
"categories".to_string(),
Arc::new(EmptyTable::new(categories_schema)),
)?;
let catalog = Arc::new(MemoryCatalogProvider::new());
catalog.register_schema("main", schema_provider)?;
ctx.register_catalog("warehouse", catalog);
let dialect = DuckDBDialect::new();
let unparser = Unparser::new(&dialect);
let conn = Connection::open("warehouse.duckdb")?;
{
println!("Unoptimized query:");
let unopt_plan = ctx.sql(QUERY).await?.into_unoptimized_plan();
let sql = unparser.plan_to_sql(&unopt_plan)?;
let mut stmt = conn.prepare(&sql.to_string())?;
match stmt.query([]) {
Ok(_) => {
println!("success");
}
Err(e) => {
println!("failed: {e}");
}
}
}
println!("building optimized plan");
let plan = ctx.sql(QUERY).await?.into_optimized_plan()?;
println!("unparsing optimized plan");
let sql = unparser.plan_to_sql(&plan)?;
println!("Generated SQL:\n{sql}");
{
println!("Optimized query:");
let mut stmt = conn.prepare(&sql.to_string())?;
match stmt.query([]) {
Ok(_) => {
println!("success");
}
Err(e) => {
println!("failed: {e}");
}
}
}
Ok(())
}
```
Generate the empty DuckDB tables:
```bash
duckdb warehouse.duckdb "
CREATE TABLE IF NOT EXISTS main.order_items (
item_id INTEGER NOT NULL,
order_id INTEGER,
product_id INTEGER,
quantity INTEGER,
unit_price DECIMAL(10, 2)
);
CREATE TABLE IF NOT EXISTS main.orders (
order_id INTEGER NOT NULL,
customer_id INTEGER,
order_date DATE,
status VARCHAR,
channel VARCHAR,
discount_pct DECIMAL(5, 2),
shipping_cost DECIMAL(8, 2)
);
CREATE TABLE IF NOT EXISTS main.products (
product_id INTEGER NOT NULL,
category_id INTEGER,
sku VARCHAR,
name VARCHAR,
price DECIMAL(10, 2),
cost DECIMAL(10, 2),
weight_kg DECIMAL(6, 3),
is_active BOOLEAN,
stock_qty INTEGER
);
CREATE TABLE IF NOT EXISTS main.categories (
category_id INTEGER NOT NULL,
name VARCHAR,
parent_id INTEGER,
display_rank INTEGER
);
"
```
The reproducer will result in the following error:
```
registering tables
Unoptimized query:
success
building optimized plan
unparsing optimized plan
Error: SchemaError(FieldNotFound { field: Column { relation: Some(Bare {
table: "o" }), name: "__common_expr_1" }, valid_fields: [Column { relation:
None, name: "__common_expr_1" }, Column { relation: None, name:
"__common_expr_2" }, Column { relation: Some(Bare { table: "o" }), name:
"order_id" }, Column { relation: Some(Bare { table: "o" }), name: "customer_id"
}, Column { relation: Some(Bare { table: "o" }), name: "order_date" }, Column {
relation: Some(Bare { table: "o" }), name: "status" }, Column { relation:
Some(Bare { table: "o" }), name: "channel" }] }, Some(""))
```
Clearly the CSE pass here seems to be introducing an error here, so lets
disable that optimizer and try again:
```rust
ctx.remove_optimizer_rule(CommonSubexprEliminate::new().name());
```
This results in a different error, now coming from DuckDB:
```
registering tables
Unoptimized query:
success
building optimized plan
unparsing optimized plan
Generated SQL:
SELECT "oi"."item_id", "oi"."product_id", "oi"."quantity",
"oi"."unit_price", "oi"."line_total", "o"."order_id", "o"."customer_id",
"o"."order_date", "o"."status", "o"."channel", "o"."discount_pct",
"o"."shipping_cost", "o"."is_fulfilled", "p"."category_id", "p"."sku",
"p"."product_name", "p"."price", "p"."cost", "p"."weight_kg", "p"."is_active",
"p"."stock_qty", "p"."gross_margin", "p"."margin_pct", "p"."category_name" FROM
(SELECT "oi"."item_id", "oi"."product_id", "oi"."quantity", "oi"."unit_price",
"oi"."line_total", "o"."order_id", "o"."customer_id", "o"."order_date",
"o"."status", "o"."channel", "o"."discount_pct", "o"."shipping_cost",
"o"."is_fulfilled" FROM (SELECT "oi"."item_id", "oi"."order_id",
"oi"."product_id", "oi"."quantity", "oi"."unit_price", (CAST("oi"."quantity" AS
DECIMAL(10,0)) * "oi"."unit_price") AS "line_total" FROM
"warehouse"."main"."order_items" AS "oi") AS "oi" INNER JOIN (SELECT
"o"."order_id", "o"."customer_id", "o"."order_date", lower("o"."status") A
S "status", lower("o"."channel") AS "channel", CASE WHEN
CAST("o"."discount_pct" AS DECIMAL(22,2)) IS NOT NULL THEN
CAST("o"."discount_pct" AS DECIMAL(22,2)) ELSE 0.00 END AS "discount_pct", CASE
WHEN CAST("o"."shipping_cost" AS DECIMAL(22,2)) IS NOT NULL THEN
CAST("o"."shipping_cost" AS DECIMAL(22,2)) ELSE 0.00 END AS "shipping_cost",
(("o"."status" = 'completed') OR ("o"."status" = 'shipped')) AS "is_fulfilled"
FROM "warehouse"."main"."orders" AS "o") AS "o" ON "oi"."order_id" =
"o"."order_id") INNER JOIN (SELECT "p"."product_id", "p"."category_id",
"p"."sku", "p"."name" AS "product_name", "p"."price", "p"."cost",
"p"."weight_kg", "p"."is_active", "p"."stock_qty", round(("p"."price" -
"p"."cost"), 2) AS "gross_margin", round((("p"."price" - "p"."cost") //
nullif(CAST("p"."price" AS DECIMAL(22,2)), 0.00)), 4) AS "margin_pct",
"c"."name" AS "category_name" FROM "warehouse"."main"."products" AS "p" LEFT
OUTER JOIN "warehouse"."main"."categories" AS "c" USING("category_id")) AS "p"
ON
"oi"."product_id" = "p"."product_id"
Optimized query:
Error: DuckDBFailure(Error { code: Unknown, extended_code: 1 }, Some("Binder
Error: Referenced table \"oi\" not found!\nCandidate tables: \"p\"\n\nLINE 1:
...\".\"main\".\"categories\" AS \"c\" USING(\"category_id\")) AS \"p\" ON
\"oi\".\"product_id\" = \"p\".\"product_id\"\n
```
### Expected behavior
Both of the unoptimized and optimized plans should be able to be unparsed
into executable SQL. I don't expect them to be identical SQL statements, but I
do expect them to be logically equivalent.
### Additional context
As far as I understand the `Unparser`, this is unintended and an optimized
`LogicalPlan` should still be able to be unparsed into valid SQL. I've run in
quite a lot of similar problems where the resulting SQL is mostly correct but
the there are either missing or incorrect `SubqueryAlias`'s that result in
ambiguity in the resulting query. To me this seems more likely to be a bug in
how the `Unparser` processes plans that have been mangled by the optimizer,
rather than a bug with the optimizer. The optimized `LogicalPlan`s that are
generated look correct to me.
Note, I've also seen problems relating to the `group_alias_N`'s that are
created by the
[SingleDistinctToGroupBy](https://docs.rs/datafusion/latest/datafusion/optimizer/single_distinct_to_groupby/struct.SingleDistinctToGroupBy.html)
pass. But, I didn't include them here yet as I haven't been able to put
together a simple reproducible example.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]