erratic-pattern opened a new issue, #19049: URL: https://github.com/apache/datafusion/issues/19049
### Describe the bug DataFusion fails with schema mismatch error when processing UNION ALL query on parquet files with field metadata. `Error while planning query: Internal error: Physical input schema should be the same as the one converted from logical input schema. Differences: .` ### To Reproduce Parquet: [union_all_repo.zip](https://github.com/user-attachments/files/23885649/union_all_repo.zip) Query: ```sql SET datafusion.execution.parquet.skip_metadata = false; SELECT AVG(usage_idle), AVG(usage_system) FROM ( SELECT time, usage_idle, NULL::DOUBLE as usage_system FROM 'union_all_repro.parquet' UNION ALL SELECT time, NULL::DOUBLE as usage_idle, usage_system FROM 'union_all_repro.parquet' ); ``` ### Expected behavior Query should succeed with matching logical and physical schema ### Additional context ## Sequence of Events ### 1. Projection Optimization [`optimize_projections`](https://github.com/apache/datafusion/blob/main/datafusion/optimizer/src/optimize_projections/mod.rs) sees that `time` column is unused in the projection and removes it from the `UnionExec` ``` +------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | initial_logical_plan | Projection: avg(usage_idle), avg(usage_system) | | | Aggregate: groupBy=[[]], aggr=[[avg(usage_idle), avg(usage_system)]] | | | Union | | | Projection: ./union_all_repro.parquet.time, ./union_all_repro.parquet.usage_idle, CAST(NULL AS Float64) AS usage_system | | | TableScan: ./union_all_repro.parquet | | | Projection: ./union_all_repro.parquet.time, CAST(NULL AS Float64) AS usage_idle, ./union_all_repro.parquet.usage_system | | | TableScan: ./union_all_repro.parquet | | logical_plan after resolve_grouping_function | SAME TEXT AS ABOVE | | logical_plan after type_coercion | SAME TEXT AS ABOVE | | analyzed_logical_plan | SAME TEXT AS ABOVE | | logical_plan after eliminate_nested_union | SAME TEXT AS ABOVE | | logical_plan after simplify_expressions | Projection: avg(usage_idle), avg(usage_system) | | | Aggregate: groupBy=[[]], aggr=[[avg(usage_idle), avg(usage_system)]] | | | Union | | | Projection: ./union_all_repro.parquet.time, ./union_all_repro.parquet.usage_idle, Float64(NULL) AS usage_system | | | TableScan: ./union_all_repro.parquet | | | Projection: ./union_all_repro.parquet.time, Float64(NULL) AS usage_idle, ./union_all_repro.parquet.usage_system | | | TableScan: ./union_all_repro.parquet | | logical_plan after replace_distinct_aggregate | SAME TEXT AS ABOVE | | logical_plan after eliminate_join | SAME TEXT AS ABOVE | | logical_plan after decorrelate_predicate_subquery | SAME TEXT AS ABOVE | | logical_plan after scalar_subquery_to_join | SAME TEXT AS ABOVE | | logical_plan after decorrelate_lateral_join | SAME TEXT AS ABOVE | | logical_plan after extract_equijoin_predicate | SAME TEXT AS ABOVE | | logical_plan after eliminate_duplicated_expr | SAME TEXT AS ABOVE | | logical_plan after eliminate_filter | SAME TEXT AS ABOVE | | logical_plan after eliminate_cross_join | SAME TEXT AS ABOVE | | logical_plan after eliminate_limit | SAME TEXT AS ABOVE | | logical_plan after propagate_empty_relation | SAME TEXT AS ABOVE | | logical_plan after eliminate_one_union | SAME TEXT AS ABOVE | | logical_plan after filter_null_join_keys | SAME TEXT AS ABOVE | | logical_plan after eliminate_outer_join | SAME TEXT AS ABOVE | | logical_plan after push_down_limit | SAME TEXT AS ABOVE | | logical_plan after push_down_filter | SAME TEXT AS ABOVE | | logical_plan after single_distinct_aggregation_to_group_by | SAME TEXT AS ABOVE | | logical_plan after eliminate_group_by_constant | SAME TEXT AS ABOVE | | logical_plan after common_sub_expression_eliminate | SAME TEXT AS ABOVE | | logical_plan after optimize_projections | Aggregate: groupBy=[[]], aggr=[[avg(usage_idle), avg(usage_system)]] | | | Union | | | Projection: ./union_all_repro.parquet.usage_idle, Float64(NULL) AS usage_system | | | TableScan: ./union_all_repro.parquet projection=[usage_idle] | | | Projection: Float64(NULL) AS usage_idle, ./union_all_repro.parquet.usage_system | | | TableScan: ./union_all_repro.parquet projection=[usage_system] ``` ### 2. Schema Recomputation [`optimize_projections`](https://github.com/apache/datafusion/blob/main/datafusion/optimizer/src/optimize_projections/mod.rs) [calls](https://github.com/apache/datafusion/blob/7b4593f36e880ca1c43746d5c4465fff5a3901c3/datafusion/optimizer/src/optimize_projections/mod.rs#L468) [`recompute_schema`](https://github.com/apache/arrow-datafusion/blob/7b4593f36e880ca1c43746d5c4465fff5a3901c3/datafusion/expr/src/logical_plan/plan.rs#L624-L756) since the plan has changed. `recompute_schema` sees that the number of fields has changed and [creates a new schema](https://github.com/influxdata/arrow-datafusion/blob/82cd7f3cdb8dbe0b63b8b62f54543641598655a0/datafusion/expr/src/logical_plan/plan.rs#L718) with [`Union::try_new`](https://github.com/influxdata/arrow-datafusion/blob/82cd7f3cdb8dbe0b63b8b62f54543641598655a0/datafusion/expr/src/logical_plan/plan.rs#L718) [`Union::try_new`](https://github.com/influxdata/arrow-datafusion/blob/82cd7f3cdb8dbe0b63b8b62f54543641598655a0/datafusion/expr/src/logical_plan/plan.rs#L718) calls [`Union::derive_schema_from_inputs`](https://github.com/apache/datafusion/blob/7b4593f36e880ca1c43746d5c4465fff5a3901c3/datafusion/expr/src/logical_plan/plan.rs#L2867-L2881) to recreate the Union Schema from the plan inputs. For each field in the schema, it calls [`intersect_metadata_for_union`](https://github.com/apache/datafusion/blob/7b4593f36e880ca1c43746d5c4465fff5a3901c3/datafusion/expr/src/expr.rs#L506-L520) which only keeps metadata when the field has the same metadata across all plan inputs. Since the `NULL` literal doesn't have the same metadata as our column from parquet, the metadata gets removed from the logical schema. ### 3. Physical/Logical Schema Mismatch During physical planning, we call [`DefaultPhysicalPlanner::map_logical_node_to_physical`](https://github.com/apache/datafusion/blob/7b4593f36e880ca1c43746d5c4465fff5a3901c3/datafusion/core/src/physical_planner.rs#L447-L1525) which [checks](https://github.com/apache/datafusion/blob/7b4593f36e880ca1c43746d5c4465fff5a3901c3/datafusion/core/src/physical_planner.rs#L683-L724) the input of the `Aggregate` node (in this case `Union`) to see if its logical schema matches its physical schema. However, because we previously removed the metadata from `Union`, the [metadata no longer matches](https://github.com/apache/datafusion/blob/7b4593f36e880ca1c43746d5c4465fff5a3901c3/datafusion/core/src/schema_equivalence.rs#L42) when compared. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
