alamb commented on code in PR #11506:
URL: https://github.com/apache/datafusion/pull/11506#discussion_r1687056920


##########
datafusion/sqllogictest/test_files/order.slt:
##########
@@ -1132,3 +1132,10 @@ physical_plan
 02)--ProjectionExec: expr=[CAST(inc_col@0 > desc_col@1 AS Int32) as c]
 03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
 04)------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, 
projection=[inc_col, desc_col], output_orderings=[[inc_col@0 ASC NULLS LAST], 
[desc_col@1 DESC]], has_header=true
+
+# Union a query with the actual data and one with a constant
+query I
+SELECT (SELECT c from ordered_table ORDER BY c LIMIT 1) UNION ALL (SELECT 23 
as c from ordered_table ORDER BY c LIMIT 1) ORDER BY c;

Review Comment:
   I also tried adding another constant in the orderby and it works ✅ 
   
   ```
   query I
   SELECT (SELECT c from ordered_table ORDER BY 'A', c LIMIT 1) UNION ALL 
(SELECT 23 as c from ordered_table ORDER BY 'B', c LIMIT 1) ORDER BY c;
   ----
   0
   23
   ```



##########
datafusion/physical-expr-common/src/physical_expr.rs:
##########
@@ -191,6 +193,33 @@ pub fn with_new_children_if_necessary(
     }
 }
 
+/// Rewrites an expression according to new schema; i.e. changes the columns it
+/// refers to with the column at corresponding index in the new schema. Returns
+/// an error if the given schema has fewer columns than the original schema.
+/// Note that the resulting expression may not be valid if data types in the
+/// new schema is incompatible with expression nodes.
+pub fn with_new_schema(

Review Comment:
   👍 



##########
datafusion/physical-expr-common/src/physical_expr.rs:
##########
@@ -191,6 +193,33 @@ pub fn with_new_children_if_necessary(
     }
 }
 
+/// Rewrites an expression according to new schema; i.e. changes the columns it
+/// refers to with the column at corresponding index in the new schema. Returns
+/// an error if the given schema has fewer columns than the original schema.
+/// Note that the resulting expression may not be valid if data types in the
+/// new schema is incompatible with expression nodes.
+pub fn with_new_schema(
+    expr: Arc<dyn PhysicalExpr>,
+    schema: &SchemaRef,
+) -> Result<Arc<dyn PhysicalExpr>> {
+    Ok(expr
+        .transform_up(|expr| {
+            if let Some(col) = expr.as_any().downcast_ref::<Column>() {
+                let idx = col.index();
+                let Some(field) = schema.fields().get(idx) else {
+                    return plan_err!(

Review Comment:
   perhaps this should be an internal error



##########
datafusion/physical-expr/src/equivalence/properties.rs:
##########
@@ -1484,6 +1527,84 @@ impl Hash for ExprWrapper {
     }
 }
 
+/// Calculates the union (in the sense of `UnionExec`) `EquivalenceProperties`
+/// of  `lhs` and `rhs` according to the schema of `lhs`.
+fn calculate_union_binary(
+    lhs: EquivalenceProperties,
+    mut rhs: EquivalenceProperties,
+) -> Result<EquivalenceProperties> {
+    // TODO: In some cases, we should be able to preserve some equivalence
+    //       classes. Add support for such cases.
+
+    // Harmonize the schema of the rhs with the schema of the lhs (which is 
the accumulator schema):
+    if !rhs.schema.eq(&lhs.schema) {
+        rhs = rhs.with_new_schema(Arc::clone(&lhs.schema))?;
+    }
+
+    // First, calculate valid constants for the union. A quantity is constant
+    // after the union if it is constant in both sides.
+    let constants = lhs
+        .constants()
+        .iter()
+        .filter(|const_expr| const_exprs_contains(rhs.constants(), 
const_expr.expr()))
+        .map(|const_expr| {
+            // TODO: When both sides' constants are valid across partitions,
+            //       the union's constant should also be valid if values are
+            //       the same. However, we do not have the capability to
+            //       check this yet.
+            
ConstExpr::new(Arc::clone(const_expr.expr())).with_across_partitions(false)
+        })
+        .collect();
+
+    // Next, calculate valid orderings for the union by searching for prefixes
+    // in both sides.
+    let mut orderings = vec![];
+    for mut ordering in lhs.normalized_oeq_class().orderings {
+        // Progressively shorten the ordering to search for a satisfied prefix:
+        while !rhs.ordering_satisfy(&ordering) {

Review Comment:
   do we also need to check `!ordering.is_empty()` or maybe that is implicit in 
`ordering_satisfy`



##########
datafusion/physical-plan/src/union.rs:
##########
@@ -99,7 +99,12 @@ impl UnionExec {
     /// Create a new UnionExec
     pub fn new(inputs: Vec<Arc<dyn ExecutionPlan>>) -> Self {
         let schema = union_schema(&inputs);
-        let cache = Self::compute_properties(&inputs, schema);
+        // The schema of the inputs and the union schema is consistent when:
+        // - They have the same number of fields, and
+        // - Their fields have same types at the same indices.
+        // Here, we know that schemas are consistent and the call below can
+        // not return an error.
+        let cache = Self::compute_properties(&inputs, schema).unwrap();

Review Comment:
   A slightly more defensive API might be to make this `try_new()` and return a 
`Result<Self>` rather than panic. However I think this is fine too



##########
datafusion/physical-expr/src/equivalence/properties.rs:
##########
@@ -1484,6 +1527,84 @@ impl Hash for ExprWrapper {
     }
 }
 
+/// Calculates the union (in the sense of `UnionExec`) `EquivalenceProperties`
+/// of  `lhs` and `rhs` according to the schema of `lhs`.
+fn calculate_union_binary(
+    lhs: EquivalenceProperties,
+    mut rhs: EquivalenceProperties,
+) -> Result<EquivalenceProperties> {
+    // TODO: In some cases, we should be able to preserve some equivalence

Review Comment:
   I know this comment is copied from the old code, so no need to change it. 
   
   however, it would be nice if we could give some hints about what could be 
preserved (I think there is already some non trivial support for preserving 
equivalence classes)



##########
datafusion/physical-expr/src/equivalence/properties.rs:
##########
@@ -1484,6 +1527,84 @@ impl Hash for ExprWrapper {
     }
 }
 
+/// Calculates the union (in the sense of `UnionExec`) `EquivalenceProperties`
+/// of  `lhs` and `rhs` according to the schema of `lhs`.
+fn calculate_union_binary(
+    lhs: EquivalenceProperties,
+    mut rhs: EquivalenceProperties,
+) -> Result<EquivalenceProperties> {
+    // TODO: In some cases, we should be able to preserve some equivalence
+    //       classes. Add support for such cases.
+
+    // Harmonize the schema of the rhs with the schema of the lhs (which is 
the accumulator schema):
+    if !rhs.schema.eq(&lhs.schema) {
+        rhs = rhs.with_new_schema(Arc::clone(&lhs.schema))?;
+    }
+
+    // First, calculate valid constants for the union. A quantity is constant
+    // after the union if it is constant in both sides.
+    let constants = lhs
+        .constants()
+        .iter()
+        .filter(|const_expr| const_exprs_contains(rhs.constants(), 
const_expr.expr()))
+        .map(|const_expr| {
+            // TODO: When both sides' constants are valid across partitions,
+            //       the union's constant should also be valid if values are
+            //       the same. However, we do not have the capability to
+            //       check this yet.
+            
ConstExpr::new(Arc::clone(const_expr.expr())).with_across_partitions(false)
+        })
+        .collect();
+
+    // Next, calculate valid orderings for the union by searching for prefixes
+    // in both sides.
+    let mut orderings = vec![];
+    for mut ordering in lhs.normalized_oeq_class().orderings {
+        // Progressively shorten the ordering to search for a satisfied prefix:
+        while !rhs.ordering_satisfy(&ordering) {
+            ordering.pop();
+        }
+        // There is a non-trivial satisfied prefix, add it as a valid ordering:
+        if !ordering.is_empty() {
+            orderings.push(ordering);
+        }
+    }
+    for mut ordering in rhs.normalized_oeq_class().orderings {
+        // Progressively shorten the ordering to search for a satisfied prefix:
+        while !lhs.ordering_satisfy(&ordering) {
+            ordering.pop();
+        }
+        // There is a non-trivial satisfied prefix, add it as a valid ordering:
+        if !ordering.is_empty() {
+            orderings.push(ordering);
+        }
+    }
+    let mut eq_properties = EquivalenceProperties::new(lhs.schema);
+    eq_properties.constants = constants;
+    eq_properties.add_new_orderings(orderings);
+    Ok(eq_properties)
+}
+
+/// Calculates the union (in the sense of `UnionExec`) `EquivalenceProperties`
+/// of the given `EquivalenceProperties` in `eqps` according to the given
+/// output `schema` (which need not be the same with those of `lhs` and `rhs`
+/// as details such as nullability may be different).
+pub fn calculate_union(
+    eqps: Vec<EquivalenceProperties>,
+    schema: SchemaRef,
+) -> Result<EquivalenceProperties> {
+    // TODO: In some cases, we should be able to preserve some equivalence

Review Comment:
   same nitpick above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to