Jefffrey commented on code in PR #22640:
URL: https://github.com/apache/datafusion/pull/22640#discussion_r3409247648


##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -6285,4 +6347,226 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_recompute_schema_union_type_mismatch() -> Result<()> {
+        use arrow::datatypes::{DataType, Field, Schema};
+
+        let schema_i32 = Schema::new(vec![Field::new("a", DataType::Int32, 
false)]);
+        let schema_i64 = Schema::new(vec![Field::new("a", DataType::Int64, 
false)]);
+
+        // Build a Union whose schema starts out as Int32 (matching its 
inputs).
+        let original = Union::try_new(vec![
+            Arc::new(table_scan(Some("t1"), &schema_i32, None)?.build()?),
+            Arc::new(table_scan(Some("t2"), &schema_i32, None)?.build()?),
+        ])?;
+        assert_eq!(
+            original.schema.field(0).data_type(),
+            &DataType::Int32,
+            "sanity: starting schema is Int32"
+        );
+
+        // Simulate a rewrite pass (e.g. type-coercion) that replaced the 
inputs
+        // with Int64-typed versions while leaving the Union's cached schema 
stale.
+        // Same width, different types — this is exactly the bug scenario.

Review Comment:
   ```suggestion
   ```
   
   I do not appreciate these LLM comments



##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -6285,4 +6347,226 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_recompute_schema_union_type_mismatch() -> Result<()> {
+        use arrow::datatypes::{DataType, Field, Schema};
+
+        let schema_i32 = Schema::new(vec![Field::new("a", DataType::Int32, 
false)]);
+        let schema_i64 = Schema::new(vec![Field::new("a", DataType::Int64, 
false)]);
+
+        // Build a Union whose schema starts out as Int32 (matching its 
inputs).
+        let original = Union::try_new(vec![
+            Arc::new(table_scan(Some("t1"), &schema_i32, None)?.build()?),
+            Arc::new(table_scan(Some("t2"), &schema_i32, None)?.build()?),
+        ])?;
+        assert_eq!(
+            original.schema.field(0).data_type(),
+            &DataType::Int32,
+            "sanity: starting schema is Int32"
+        );
+
+        // Simulate a rewrite pass (e.g. type-coercion) that replaced the 
inputs
+        // with Int64-typed versions while leaving the Union's cached schema 
stale.
+        // Same width, different types — this is exactly the bug scenario.
+        let stale = LogicalPlan::Union(Union {
+            inputs: vec![
+                Arc::new(table_scan(Some("t1"), &schema_i64, None)?.build()?),
+                Arc::new(table_scan(Some("t2"), &schema_i64, None)?.build()?),
+            ],
+            schema: Arc::clone(&original.schema),
+        });
+
+        let recomputed = stale.recompute_schema()?;
+
+        assert_eq!(
+            recomputed.schema().field(0).data_type(),
+            &DataType::Int64,
+            "Union schema should track the new Int64 input types after \
+         recompute_schema(), but the width-only check left it stale"
+        );
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_recompute_schema_union_name_mismatch() -> Result<()> {
+        use arrow::datatypes::{DataType, Field, Schema};
+
+        let schema_a = Schema::new(vec![Field::new("a", DataType::Int32, 
false)]);
+        let schema_b = Schema::new(vec![Field::new("b", DataType::Int32, 
false)]);
+
+        // Build a Union whose schema starts out with column "a".
+        let original = Union::try_new(vec![
+            Arc::new(table_scan(Some("t1"), &schema_a, None)?.build()?),
+            Arc::new(table_scan(Some("t2"), &schema_a, None)?.build()?),
+        ])?;
+        assert_eq!(
+            original.schema.field(0).name(),
+            "a",
+            "sanity: starting schema has column name 'a'"
+        );
+
+        // Simulate a rewrite pass that renamed the columns but left
+        // the cached schema stale. Same width and type, different name.
+        let stale = LogicalPlan::Union(Union {
+            inputs: vec![
+                Arc::new(table_scan(Some("t1"), &schema_b, None)?.build()?),
+                Arc::new(table_scan(Some("t2"), &schema_b, None)?.build()?),
+            ],
+            schema: Arc::clone(&original.schema),
+        });
+
+        let recomputed = stale.recompute_schema()?;
+
+        assert_eq!(
+            recomputed.schema().field(0).name(),
+            "b",
+            "Union schema should reflect the renamed column after \
+         recompute_schema(), but the width-only check left it stale"
+        );

Review Comment:
   Is this a valid case? That the names should update like this?



##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -3156,6 +3172,52 @@ impl Union {
         Ok(Union { inputs, schema })
     }
 
+    /// Constructs a new Union from inputs, deriving the schema by position
+    /// (like `try_new`) but preserving schema-level and field-level metadata
+    /// using "later takes precedence" (extend) semantics — matching the
+    /// behavior of `coerce_union_schema_with_schema`.
+    pub fn try_new_with_metadata(inputs: Vec<Arc<LogicalPlan>>) -> 
Result<Self> {

Review Comment:
   ```suggestion
       pub(crate) fn try_new_with_metadata(inputs: Vec<Arc<LogicalPlan>>) -> 
Result<Self> {
   ```
   
   Just to keep it out of the public API unless strictly necessary, to give us 
some flexibility in the future



##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -705,20 +705,36 @@ impl LogicalPlan {
                 }))
             }
             LogicalPlan::Union(Union { inputs, schema }) => {
+                // Fast path: check structural compatibility against all 
inputs.
+                //
+                // For position-based Union (try_new), schema names come 
exclusively
+                // from inputs[0], so we check names/qualifiers only against 
the first
+                // input. Data types and nullability must match across every 
input.

Review Comment:
   how much does this matter?



##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -6285,4 +6347,226 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_recompute_schema_union_type_mismatch() -> Result<()> {
+        use arrow::datatypes::{DataType, Field, Schema};
+
+        let schema_i32 = Schema::new(vec![Field::new("a", DataType::Int32, 
false)]);
+        let schema_i64 = Schema::new(vec![Field::new("a", DataType::Int64, 
false)]);
+
+        // Build a Union whose schema starts out as Int32 (matching its 
inputs).
+        let original = Union::try_new(vec![
+            Arc::new(table_scan(Some("t1"), &schema_i32, None)?.build()?),
+            Arc::new(table_scan(Some("t2"), &schema_i32, None)?.build()?),
+        ])?;
+        assert_eq!(
+            original.schema.field(0).data_type(),
+            &DataType::Int32,
+            "sanity: starting schema is Int32"
+        );
+
+        // Simulate a rewrite pass (e.g. type-coercion) that replaced the 
inputs
+        // with Int64-typed versions while leaving the Union's cached schema 
stale.
+        // Same width, different types — this is exactly the bug scenario.
+        let stale = LogicalPlan::Union(Union {
+            inputs: vec![
+                Arc::new(table_scan(Some("t1"), &schema_i64, None)?.build()?),
+                Arc::new(table_scan(Some("t2"), &schema_i64, None)?.build()?),
+            ],
+            schema: Arc::clone(&original.schema),
+        });
+
+        let recomputed = stale.recompute_schema()?;
+
+        assert_eq!(
+            recomputed.schema().field(0).data_type(),
+            &DataType::Int64,
+            "Union schema should track the new Int64 input types after \
+         recompute_schema(), but the width-only check left it stale"
+        );
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_recompute_schema_union_name_mismatch() -> Result<()> {
+        use arrow::datatypes::{DataType, Field, Schema};
+
+        let schema_a = Schema::new(vec![Field::new("a", DataType::Int32, 
false)]);
+        let schema_b = Schema::new(vec![Field::new("b", DataType::Int32, 
false)]);
+
+        // Build a Union whose schema starts out with column "a".
+        let original = Union::try_new(vec![
+            Arc::new(table_scan(Some("t1"), &schema_a, None)?.build()?),
+            Arc::new(table_scan(Some("t2"), &schema_a, None)?.build()?),
+        ])?;
+        assert_eq!(
+            original.schema.field(0).name(),
+            "a",
+            "sanity: starting schema has column name 'a'"
+        );
+
+        // Simulate a rewrite pass that renamed the columns but left
+        // the cached schema stale. Same width and type, different name.
+        let stale = LogicalPlan::Union(Union {
+            inputs: vec![
+                Arc::new(table_scan(Some("t1"), &schema_b, None)?.build()?),
+                Arc::new(table_scan(Some("t2"), &schema_b, None)?.build()?),
+            ],
+            schema: Arc::clone(&original.schema),
+        });
+
+        let recomputed = stale.recompute_schema()?;
+
+        assert_eq!(
+            recomputed.schema().field(0).name(),
+            "b",
+            "Union schema should reflect the renamed column after \
+         recompute_schema(), but the width-only check left it stale"
+        );
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_recompute_schema_union_nullability_mismatch() -> Result<()> {
+        use arrow::datatypes::{DataType, Field, Schema};
+
+        // nullable: false
+        let schema_not_null = Schema::new(vec![Field::new("a", 
DataType::Int32, false)]);
+        // nullable: true
+        let schema_nullable = Schema::new(vec![Field::new("a", 
DataType::Int32, true)]);
+

Review Comment:
   ```suggestion
           let schema_not_null = Schema::new(vec![Field::new("a", 
DataType::Int32, false)]);
           let schema_nullable = Schema::new(vec![Field::new("a", 
DataType::Int32, true)]);
   ```
   
   This is already evident



##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -6285,4 +6347,226 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_recompute_schema_union_type_mismatch() -> Result<()> {
+        use arrow::datatypes::{DataType, Field, Schema};
+
+        let schema_i32 = Schema::new(vec![Field::new("a", DataType::Int32, 
false)]);
+        let schema_i64 = Schema::new(vec![Field::new("a", DataType::Int64, 
false)]);
+
+        // Build a Union whose schema starts out as Int32 (matching its 
inputs).
+        let original = Union::try_new(vec![
+            Arc::new(table_scan(Some("t1"), &schema_i32, None)?.build()?),
+            Arc::new(table_scan(Some("t2"), &schema_i32, None)?.build()?),
+        ])?;
+        assert_eq!(
+            original.schema.field(0).data_type(),
+            &DataType::Int32,
+            "sanity: starting schema is Int32"
+        );
+
+        // Simulate a rewrite pass (e.g. type-coercion) that replaced the 
inputs
+        // with Int64-typed versions while leaving the Union's cached schema 
stale.
+        // Same width, different types — this is exactly the bug scenario.
+        let stale = LogicalPlan::Union(Union {
+            inputs: vec![
+                Arc::new(table_scan(Some("t1"), &schema_i64, None)?.build()?),
+                Arc::new(table_scan(Some("t2"), &schema_i64, None)?.build()?),
+            ],
+            schema: Arc::clone(&original.schema),
+        });
+
+        let recomputed = stale.recompute_schema()?;
+
+        assert_eq!(
+            recomputed.schema().field(0).data_type(),
+            &DataType::Int64,
+            "Union schema should track the new Int64 input types after \
+         recompute_schema(), but the width-only check left it stale"
+        );
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_recompute_schema_union_name_mismatch() -> Result<()> {
+        use arrow::datatypes::{DataType, Field, Schema};
+
+        let schema_a = Schema::new(vec![Field::new("a", DataType::Int32, 
false)]);
+        let schema_b = Schema::new(vec![Field::new("b", DataType::Int32, 
false)]);
+
+        // Build a Union whose schema starts out with column "a".
+        let original = Union::try_new(vec![
+            Arc::new(table_scan(Some("t1"), &schema_a, None)?.build()?),
+            Arc::new(table_scan(Some("t2"), &schema_a, None)?.build()?),
+        ])?;
+        assert_eq!(
+            original.schema.field(0).name(),
+            "a",
+            "sanity: starting schema has column name 'a'"
+        );
+
+        // Simulate a rewrite pass that renamed the columns but left
+        // the cached schema stale. Same width and type, different name.
+        let stale = LogicalPlan::Union(Union {
+            inputs: vec![
+                Arc::new(table_scan(Some("t1"), &schema_b, None)?.build()?),
+                Arc::new(table_scan(Some("t2"), &schema_b, None)?.build()?),
+            ],
+            schema: Arc::clone(&original.schema),
+        });
+
+        let recomputed = stale.recompute_schema()?;
+
+        assert_eq!(
+            recomputed.schema().field(0).name(),
+            "b",
+            "Union schema should reflect the renamed column after \
+         recompute_schema(), but the width-only check left it stale"
+        );
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_recompute_schema_union_nullability_mismatch() -> Result<()> {
+        use arrow::datatypes::{DataType, Field, Schema};
+
+        // nullable: false
+        let schema_not_null = Schema::new(vec![Field::new("a", 
DataType::Int32, false)]);
+        // nullable: true
+        let schema_nullable = Schema::new(vec![Field::new("a", 
DataType::Int32, true)]);
+
+        // Build Union starting with NOT NULL inputs.
+        let original = Union::try_new(vec![
+            Arc::new(table_scan(Some("t1"), &schema_not_null, None)?.build()?),
+            Arc::new(table_scan(Some("t2"), &schema_not_null, None)?.build()?),
+        ])?;
+        assert!(
+            !original.schema.field(0).is_nullable(),
+            "sanity: starting schema field is NOT NULL"
+        );
+
+        // Simulate a rewrite that made the inputs nullable while leaving
+        // the Union's cached schema stale.
+        let stale = LogicalPlan::Union(Union {
+            inputs: vec![
+                Arc::new(table_scan(Some("t1"), &schema_nullable, 
None)?.build()?),
+                Arc::new(table_scan(Some("t2"), &schema_nullable, 
None)?.build()?),
+            ],
+            schema: Arc::clone(&original.schema),
+        });
+
+        let recomputed = stale.recompute_schema()?;
+
+        assert!(
+            recomputed.schema().field(0).is_nullable(),
+            "Union schema should reflect the new nullable inputs after \
+         recompute_schema(), but the stale NOT NULL schema was kept"
+        );
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_recompute_schema_union_metadata_preservation() -> Result<()> {
+        use arrow::datatypes::{DataType, Field, Schema};
+        use std::collections::HashMap;
+
+        let mut meta1 = HashMap::new();
+        meta1.insert("k1".to_string(), "v1".to_string());
+        let mut meta2 = HashMap::new();
+        meta2.insert("k1".to_string(), "v2".to_string()); // duplicate key, 
different value
+        meta2.insert("k2".to_string(), "v2".to_string());
+
+        let schema1 = Schema::new_with_metadata(
+            vec![Field::new("a", DataType::Int32, false)],
+            meta1.clone(),
+        );
+        let schema2 = Schema::new_with_metadata(

Review Comment:
   This can be tested without recompute schema



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to