drewrip opened a new issue, #22961:
URL: https://github.com/apache/datafusion/issues/22961

   ### Describe the bug
   
   The logical optimizer (ie, `into_optimized_plan()`) creates logical plans 
that can't be unparsed into correct SQL. This doesn't seem universal but does 
apply to a lot of the queries I'm trying.
   
   ## TLDR
   
   **Succeeds:**
   1. parse SQL - ✅ 
   2. build unoptimized plan - ✅ 
   3. unparse SQL - ✅ 
   4. execute against DuckDB - ✅ 
   
   **Fails:**
   1. parse SQL - ✅ 
   2. build *optimized* plan - ✅ 
   3. unparse SQL - ❌ 
   4. execute against DuckDB - ❌ 
   
   ### To Reproduce
   
   Here is a simple example that reproduces the behavior I'm describing:
   
   ```toml
   datafusion = "54.0.0"
   duckdb = { version = "1.10503.1", features = ["bundled"] }
   tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
   ```
   
   ```rust
   use std::sync::Arc;
   
   use datafusion::arrow::datatypes::{DataType, Field, Schema};
   use datafusion::catalog::{
       CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider, 
SchemaProvider,
   };
   use datafusion::datasource::empty::EmptyTable;
   use datafusion::optimizer::OptimizerRule;
   use datafusion::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
   use datafusion::prelude::*;
   use datafusion::sql::unparser::Unparser;
   use datafusion::sql::unparser::dialect::DuckDBDialect;
   use duckdb::Connection;
   
   const QUERY: &str = r#"
   SELECT * FROM
   (
   SELECT
           item_id,
           order_id,
           product_id,
           quantity,
           unit_price,
           quantity * unit_price AS line_total
       FROM
           "warehouse"."main"."order_items"
   ) oi
   JOIN (
       SELECT
           order_id,
           customer_id,
           order_date,
           lower(STATUS) AS STATUS,
           lower(channel) AS channel,
           coalesce(discount_pct, 0) AS discount_pct,
           coalesce(shipping_cost, 0) AS shipping_cost,
           STATUS IN ('completed', 'shipped') AS is_fulfilled
       FROM
           "warehouse"."main"."orders"
   ) o USING (order_id)
   JOIN (
       SELECT
           p.product_id,
           p.category_id,
           p.sku,
           p.name AS product_name,
           p.price,
           p.cost,
           p.weight_kg,
           p.is_active,
           p.stock_qty,
           round(p.price - p.cost, 2) AS gross_margin,
           round((p.price - p.cost) / nullif(p.price, 0), 4) AS margin_pct,
           c.name AS category_name
       FROM
           "warehouse"."main"."products" p
           LEFT JOIN "warehouse"."main"."categories" c USING (category_id)
   ) p USING (product_id)
   "#;
   
   #[tokio::main]
   async fn main() -> Result<(), Box<dyn std::error::Error>> {
       println!("registering tables");
       let order_items_schema = Arc::new(Schema::new(vec![
           Field::new("item_id", DataType::Int32, false),
           Field::new("order_id", DataType::Int32, true),
           Field::new("product_id", DataType::Int32, true),
           Field::new("quantity", DataType::Int32, true),
           Field::new("unit_price", DataType::Decimal128(10, 2), true),
       ]));
   
       let orders_schema = Arc::new(Schema::new(vec![
           Field::new("order_id", DataType::Int32, false),
           Field::new("customer_id", DataType::Int32, true),
           Field::new("order_date", DataType::Date32, true),
           Field::new("status", DataType::Utf8, true),
           Field::new("channel", DataType::Utf8, true),
           Field::new("discount_pct", DataType::Decimal128(5, 2), true),
           Field::new("shipping_cost", DataType::Decimal128(8, 2), true),
       ]));
   
       let products_schema = Arc::new(Schema::new(vec![
           Field::new("product_id", DataType::Int32, false),
           Field::new("category_id", DataType::Int32, true),
           Field::new("sku", DataType::Utf8, true),
           Field::new("name", DataType::Utf8, true),
           Field::new("price", DataType::Decimal128(10, 2), true),
           Field::new("cost", DataType::Decimal128(10, 2), true),
           Field::new("weight_kg", DataType::Decimal128(6, 3), true),
           Field::new("is_active", DataType::Boolean, true),
           Field::new("stock_qty", DataType::Int32, true),
       ]));
   
       let categories_schema = Arc::new(Schema::new(vec![
           Field::new("category_id", DataType::Int32, false),
           Field::new("name", DataType::Utf8, true),
           Field::new("parent_id", DataType::Int32, true),
           Field::new("display_rank", DataType::Int32, true),
       ]));
   
   
       let ctx = SessionContext::new();
   
       // ctx.remove_optimizer_rule(CommonSubexprEliminate::new().name());
   
       let schema_provider = Arc::new(MemorySchemaProvider::new());
       schema_provider.register_table(
           "order_items".to_string(),
           Arc::new(EmptyTable::new(order_items_schema)),
       )?;
       schema_provider.register_table(
           "orders".to_string(),
           Arc::new(EmptyTable::new(orders_schema)),
       )?;
       schema_provider.register_table(
           "products".to_string(),
           Arc::new(EmptyTable::new(products_schema)),
       )?;
       schema_provider.register_table(
           "categories".to_string(),
           Arc::new(EmptyTable::new(categories_schema)),
       )?;
   
       let catalog = Arc::new(MemoryCatalogProvider::new());
       catalog.register_schema("main", schema_provider)?;
       ctx.register_catalog("warehouse", catalog);
   
       let dialect = DuckDBDialect::new();
       let unparser = Unparser::new(&dialect);
       let conn = Connection::open("warehouse.duckdb")?;
   
       {
           println!("Unoptimized query:");
           let unopt_plan = ctx.sql(QUERY).await?.into_unoptimized_plan();
           let sql = unparser.plan_to_sql(&unopt_plan)?;
           let mut stmt = conn.prepare(&sql.to_string())?;
           match stmt.query([]) {
               Ok(_) => {
                   println!("success");
               }
               Err(e) => {
                   println!("failed: {e}");
               }
           }
       }
   
       println!("building optimized plan");
       let plan = ctx.sql(QUERY).await?.into_optimized_plan()?;
       println!("unparsing optimized plan");
       let sql = unparser.plan_to_sql(&plan)?;
       println!("Generated SQL:\n{sql}");
   
       {
           println!("Optimized query:");
           let mut stmt = conn.prepare(&sql.to_string())?;
           match stmt.query([]) {
               Ok(_) => {
                   println!("success");
               }
               Err(e) => {
                   println!("failed: {e}");
               }
           }
       }
   
       Ok(())
   }
   ```
   
   Generate the empty DuckDB tables:
   ```bash
   duckdb warehouse.duckdb "
   CREATE TABLE IF NOT EXISTS main.order_items (
       item_id    INTEGER NOT NULL,
       order_id   INTEGER,
       product_id INTEGER,
       quantity   INTEGER,
       unit_price DECIMAL(10, 2)
   );
   
   CREATE TABLE IF NOT EXISTS main.orders (
       order_id      INTEGER NOT NULL,
       customer_id   INTEGER,
       order_date    DATE,
       status        VARCHAR,
       channel       VARCHAR,
       discount_pct  DECIMAL(5, 2),
       shipping_cost DECIMAL(8, 2)
   );
   
   CREATE TABLE IF NOT EXISTS main.products (
       product_id  INTEGER NOT NULL,
       category_id INTEGER,
       sku         VARCHAR,
       name        VARCHAR,
       price       DECIMAL(10, 2),
       cost        DECIMAL(10, 2),
       weight_kg   DECIMAL(6, 3),
       is_active   BOOLEAN,
       stock_qty   INTEGER
   );
   
   CREATE TABLE IF NOT EXISTS main.categories (
       category_id  INTEGER NOT NULL,
       name         VARCHAR,
       parent_id    INTEGER,
       display_rank INTEGER
   );
   "
   ```
   
   
   The reproducer will result in the following error:
   ```
   registering tables
   Unoptimized query:
   success
   building optimized plan
   unparsing optimized plan
   Error: SchemaError(FieldNotFound { field: Column { relation: Some(Bare { 
table: "o" }), name: "__common_expr_1" }, valid_fields: [Column { relation: 
None, name: "__common_expr_1" }, Column { relation: None, name: 
"__common_expr_2" }, Column { relation: Some(Bare { table: "o" }), name: 
"order_id" }, Column { relation: Some(Bare { table: "o" }), name: "customer_id" 
}, Column { relation: Some(Bare { table: "o" }), name: "order_date" }, Column { 
relation: Some(Bare { table: "o" }), name: "status" }, Column { relation: 
Some(Bare { table: "o" }), name: "channel" }] }, Some(""))
   ```
   
   Clearly the CSE pass here seems to be introducing an error here, so lets 
disable that optimizer and try again:
   ```rust
   ctx.remove_optimizer_rule(CommonSubexprEliminate::new().name());
   ```
   
   This results in a different error, now coming from DuckDB:
   ```
   registering tables
   Unoptimized query:
   success
   building optimized plan
   unparsing optimized plan
   Generated SQL:
   SELECT "oi"."item_id", "oi"."product_id", "oi"."quantity", 
"oi"."unit_price", "oi"."line_total", "o"."order_id", "o"."customer_id", 
"o"."order_date", "o"."status", "o"."channel", "o"."discount_pct", 
"o"."shipping_cost", "o"."is_fulfilled", "p"."category_id", "p"."sku", 
"p"."product_name", "p"."price", "p"."cost", "p"."weight_kg", "p"."is_active", 
"p"."stock_qty", "p"."gross_margin", "p"."margin_pct", "p"."category_name" FROM 
(SELECT "oi"."item_id", "oi"."product_id", "oi"."quantity", "oi"."unit_price", 
"oi"."line_total", "o"."order_id", "o"."customer_id", "o"."order_date", 
"o"."status", "o"."channel", "o"."discount_pct", "o"."shipping_cost", 
"o"."is_fulfilled" FROM (SELECT "oi"."item_id", "oi"."order_id", 
"oi"."product_id", "oi"."quantity", "oi"."unit_price", (CAST("oi"."quantity" AS 
DECIMAL(10,0)) * "oi"."unit_price") AS "line_total" FROM 
"warehouse"."main"."order_items" AS "oi") AS "oi" INNER JOIN (SELECT 
"o"."order_id", "o"."customer_id", "o"."order_date", lower("o"."status") A
 S "status", lower("o"."channel") AS "channel", CASE WHEN 
CAST("o"."discount_pct" AS DECIMAL(22,2)) IS NOT NULL THEN 
CAST("o"."discount_pct" AS DECIMAL(22,2)) ELSE 0.00 END AS "discount_pct", CASE 
WHEN CAST("o"."shipping_cost" AS DECIMAL(22,2)) IS NOT NULL THEN 
CAST("o"."shipping_cost" AS DECIMAL(22,2)) ELSE 0.00 END AS "shipping_cost", 
(("o"."status" = 'completed') OR ("o"."status" = 'shipped')) AS "is_fulfilled" 
FROM "warehouse"."main"."orders" AS "o") AS "o" ON "oi"."order_id" = 
"o"."order_id") INNER JOIN (SELECT "p"."product_id", "p"."category_id", 
"p"."sku", "p"."name" AS "product_name", "p"."price", "p"."cost", 
"p"."weight_kg", "p"."is_active", "p"."stock_qty", round(("p"."price" - 
"p"."cost"), 2) AS "gross_margin", round((("p"."price" - "p"."cost") // 
nullif(CAST("p"."price" AS DECIMAL(22,2)), 0.00)), 4) AS "margin_pct", 
"c"."name" AS "category_name" FROM "warehouse"."main"."products" AS "p" LEFT 
OUTER JOIN "warehouse"."main"."categories" AS "c" USING("category_id")) AS "p" 
ON
  "oi"."product_id" = "p"."product_id"
   Optimized query:
   Error: DuckDBFailure(Error { code: Unknown, extended_code: 1 }, Some("Binder 
Error: Referenced table \"oi\" not found!\nCandidate tables: \"p\"\n\nLINE 1: 
...\".\"main\".\"categories\" AS \"c\" USING(\"category_id\")) AS \"p\" ON 
\"oi\".\"product_id\" = \"p\".\"product_id\"\n 
   ```
   
   
   ### Expected behavior
   
   Both of the unoptimized and optimized plans should be able to be unparsed 
into executable SQL. I don't expect them to be identical SQL statements, but I 
do expect them to be logically equivalent.
   
   ### Additional context
   
   As far as I understand the `Unparser`, this is unintended and an optimized 
`LogicalPlan` should still be able to be unparsed into valid SQL. I've run in 
quite a lot of similar problems where the resulting SQL is mostly correct but 
the there are either missing or incorrect `SubqueryAlias`'s that result in 
ambiguity in the resulting query. To me this seems more likely to be a bug in 
how the `Unparser` processes plans that have been mangled by the optimizer, 
rather than a bug with the optimizer. The optimized `LogicalPlan`s that are 
generated look correct to me.
   
   Note, I've also seen problems relating to the `group_alias_N`'s that are 
created by the 
[SingleDistinctToGroupBy](https://docs.rs/datafusion/latest/datafusion/optimizer/single_distinct_to_groupby/struct.SingleDistinctToGroupBy.html)
 pass. But, I didn't include them here yet as I haven't been able to put 
together a simple reproducible example.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to