alamb commented on code in PR #16217:
URL: https://github.com/apache/datafusion/pull/16217#discussion_r2123545316


##########
datafusion/physical-expr/src/equivalence/properties/dependency.rs:
##########
@@ -907,72 +796,13 @@ mod tests {
         for (exprs, expected) in test_cases {
             let exprs = exprs.into_iter().cloned().collect::<Vec<_>>();
             let expected = convert_to_sort_exprs(&expected);
-            let (actual, _) = eq_properties.find_longest_permutation(&exprs);
+            let (actual, _) = eq_properties.find_longest_permutation(&exprs)?;
             assert_eq!(actual, expected);
         }
 
         Ok(())
     }
 
-    #[test]
-    fn test_get_finer() -> Result<()> {

Review Comment:
   Note to other reviewers, this test appears to have been moved down farther 
in the file and renamed to `test_requirements_compatible` (not removed)



##########
datafusion/physical-expr/src/equivalence/properties/mod.rs:
##########
@@ -125,40 +120,85 @@ use itertools::Itertools;
 /// # let col_c = col("c", &schema).unwrap();
 /// // This object represents data that is sorted by a ASC, c DESC
 /// // with a single constant value of b
-/// let mut eq_properties = EquivalenceProperties::new(schema)
-///   .with_constants(vec![ConstExpr::from(col_b)]);
-/// eq_properties.add_new_ordering(LexOrdering::new(vec![
+/// let mut eq_properties = EquivalenceProperties::new(schema);
+/// eq_properties.add_constants(vec![ConstExpr::from(col_b)]);
+/// eq_properties.add_ordering([
 ///   PhysicalSortExpr::new_default(col_a).asc(),
 ///   PhysicalSortExpr::new_default(col_c).desc(),
-/// ]));
+/// ]);
 ///
-/// assert_eq!(eq_properties.to_string(), "order: [[a@0 ASC, c@2 DESC]], 
const: [b@1(heterogeneous)]")
+/// assert_eq!(eq_properties.to_string(), "order: [[a@0 ASC, c@2 DESC]], eq: 
[{members: [b@1], constant: (heterogeneous)}]");
 /// ```
-#[derive(Debug, Clone)]
+#[derive(Clone, Debug)]
 pub struct EquivalenceProperties {
-    /// Distinct equivalence classes (exprs known to have the same expressions)
+    /// Distinct equivalence classes (i.e. expressions with the same value).
     eq_group: EquivalenceGroup,
-    /// Equivalent sort expressions
+    /// Equivalent sort expressions (i.e. those define the same ordering).
     oeq_class: OrderingEquivalenceClass,
-    /// Expressions whose values are constant
-    ///
-    /// TODO: We do not need to track constants separately, they can be tracked
-    ///       inside `eq_group` as `Literal` expressions.
-    constants: Vec<ConstExpr>,
-    /// Table constraints
+    /// Cache storing equivalent sort expressions in normal form (i.e. without
+    /// constants/duplicates and in standard form) and a map associating 
leading
+    /// terms with full sort expressions.
+    oeq_cache: OrderingEquivalenceCache,

Review Comment:
   👍 



##########
datafusion/physical-expr/src/equivalence/properties/mod.rs:
##########
@@ -190,382 +241,363 @@ impl EquivalenceProperties {
         &self.oeq_class
     }
 
-    /// Return the inner OrderingEquivalenceClass, consuming self
-    pub fn into_oeq_class(self) -> OrderingEquivalenceClass {
-        self.oeq_class
-    }
-
     /// Returns a reference to the equivalence group within.
     pub fn eq_group(&self) -> &EquivalenceGroup {
         &self.eq_group
     }
 
-    /// Returns a reference to the constant expressions
-    pub fn constants(&self) -> &[ConstExpr] {
-        &self.constants
-    }
-
+    /// Returns a reference to the constraints within.
     pub fn constraints(&self) -> &Constraints {
         &self.constraints
     }
 
-    /// Returns the output ordering of the properties.
-    pub fn output_ordering(&self) -> Option<LexOrdering> {
-        let constants = self.constants();
-        let mut output_ordering = 
self.oeq_class().output_ordering().unwrap_or_default();
-        // Prune out constant expressions
-        output_ordering
-            .retain(|sort_expr| !const_exprs_contains(constants, 
&sort_expr.expr));
-        (!output_ordering.is_empty()).then_some(output_ordering)
+    /// Returns all the known constants expressions.
+    pub fn constants(&self) -> Vec<ConstExpr> {
+        self.eq_group
+            .iter()
+            .filter_map(|c| {
+                c.constant.as_ref().and_then(|across| {
+                    c.canonical_expr()
+                        .map(|expr| ConstExpr::new(Arc::clone(expr), 
across.clone()))
+                })
+            })
+            .collect()
     }
 
-    /// Returns the normalized version of the ordering equivalence class 
within.
-    /// Normalization removes constants and duplicates as well as standardizing
-    /// expressions according to the equivalence group within.
-    pub fn normalized_oeq_class(&self) -> OrderingEquivalenceClass {
-        OrderingEquivalenceClass::new(
-            self.oeq_class
-                .iter()
-                .map(|ordering| self.normalize_sort_exprs(ordering))
-                .collect(),
-        )
+    /// Returns the output ordering of the properties.
+    pub fn output_ordering(&self) -> Option<LexOrdering> {
+        let concat = self.oeq_class.iter().flat_map(|o| o.iter().cloned());
+        self.normalize_sort_exprs(concat)
     }
 
     /// Extends this `EquivalenceProperties` with the `other` object.
-    pub fn extend(mut self, other: Self) -> Self {
-        self.eq_group.extend(other.eq_group);
-        self.oeq_class.extend(other.oeq_class);
-        self.with_constants(other.constants)
+    pub fn extend(mut self, other: Self) -> Result<Self> {
+        self.constraints.extend(other.constraints);
+        self.add_equivalence_group(other.eq_group)?;
+        self.add_orderings(other.oeq_class);
+        Ok(self)
     }
 
     /// Clears (empties) the ordering equivalence class within this object.
     /// Call this method when existing orderings are invalidated.
     pub fn clear_orderings(&mut self) {
         self.oeq_class.clear();
+        self.oeq_cache.clear();
     }
 
     /// Removes constant expressions that may change across partitions.
-    /// This method should be used when data from different partitions are 
merged.
+    /// This method should be used when merging data from different partitions.
     pub fn clear_per_partition_constants(&mut self) {
-        self.constants.retain(|item| {
-            matches!(item.across_partitions(), AcrossPartitions::Uniform(_))
-        })
-    }
-
-    /// Extends this `EquivalenceProperties` by adding the orderings inside the
-    /// ordering equivalence class `other`.
-    pub fn add_ordering_equivalence_class(&mut self, other: 
OrderingEquivalenceClass) {
-        self.oeq_class.extend(other);
+        if self.eq_group.clear_per_partition_constants() {
+            // Renormalize orderings if the equivalence group changes:
+            let normal_orderings = self
+                .oeq_class
+                .iter()
+                .cloned()
+                .map(|o| self.eq_group.normalize_sort_exprs(o));
+            self.oeq_cache = OrderingEquivalenceCache::new(normal_orderings);
+        }
     }
 
     /// Adds new orderings into the existing ordering equivalence class.
-    pub fn add_new_orderings(
+    pub fn add_orderings(
         &mut self,
-        orderings: impl IntoIterator<Item = LexOrdering>,
+        orderings: impl IntoIterator<Item = impl IntoIterator<Item = 
PhysicalSortExpr>>,
     ) {
-        self.oeq_class.add_new_orderings(orderings);
+        let orderings: Vec<_> =
+            orderings.into_iter().filter_map(LexOrdering::new).collect();
+        let normal_orderings: Vec<_> = orderings
+            .iter()
+            .cloned()
+            .filter_map(|o| self.normalize_sort_exprs(o))
+            .collect();
+        if !normal_orderings.is_empty() {
+            self.oeq_class.extend(orderings);
+            // Normalize given orderings to update the cache:
+            self.oeq_cache.normal_cls.extend(normal_orderings);
+            // TODO: If no ordering is found to be redunant during extension, 
we
+            //       can use a shortcut algorithm to update the leading map.
+            self.oeq_cache.update_map();
+        }
     }
 
     /// Adds a single ordering to the existing ordering equivalence class.
-    pub fn add_new_ordering(&mut self, ordering: LexOrdering) {
-        self.add_new_orderings([ordering]);
+    pub fn add_ordering(&mut self, ordering: impl IntoIterator<Item = 
PhysicalSortExpr>) {
+        self.add_orderings(std::iter::once(ordering));
     }
 
     /// Incorporates the given equivalence group to into the existing
     /// equivalence group within.
-    pub fn add_equivalence_group(&mut self, other_eq_group: EquivalenceGroup) {
-        self.eq_group.extend(other_eq_group);
+    pub fn add_equivalence_group(
+        &mut self,
+        other_eq_group: EquivalenceGroup,
+    ) -> Result<()> {
+        if !other_eq_group.is_empty() {
+            self.eq_group.extend(other_eq_group);
+            // Renormalize orderings if the equivalence group changes:
+            let normal_cls = mem::take(&mut self.oeq_cache.normal_cls);
+            let normal_orderings = normal_cls
+                .into_iter()
+                .map(|o| self.eq_group.normalize_sort_exprs(o));
+            self.oeq_cache.normal_cls = 
OrderingEquivalenceClass::new(normal_orderings);
+            self.oeq_cache.update_map();
+            // Discover any new orderings based on the new equivalence classes:
+            let leading_exprs: Vec<_> =
+                self.oeq_cache.leading_map.keys().cloned().collect();
+            for expr in leading_exprs {
+                self.discover_new_orderings(expr)?;
+            }
+        }
+        Ok(())
+    }
+
+    /// Returns the ordering equivalence class within in normal form.
+    /// Normalization standardizes expressions according to the equivalence
+    /// group within, and removes constants/duplicates.
+    pub fn normalized_oeq_class(&self) -> OrderingEquivalenceClass {
+        self.oeq_class
+            .iter()
+            .cloned()
+            .filter_map(|ordering| self.normalize_sort_exprs(ordering))
+            .collect::<Vec<_>>()
+            .into()
     }
 
     /// Adds a new equality condition into the existing equivalence group.
     /// If the given equality defines a new equivalence class, adds this new
     /// equivalence class to the equivalence group.
     pub fn add_equal_conditions(
         &mut self,
-        left: &Arc<dyn PhysicalExpr>,
-        right: &Arc<dyn PhysicalExpr>,
+        left: Arc<dyn PhysicalExpr>,
+        right: Arc<dyn PhysicalExpr>,
     ) -> Result<()> {
-        // Discover new constants in light of new the equality:
-        if self.is_expr_constant(left) {
-            // Left expression is constant, add right as constant
-            if !const_exprs_contains(&self.constants, right) {
-                let const_expr = ConstExpr::from(right)
-                    
.with_across_partitions(self.get_expr_constant_value(left));
-                self.constants.push(const_expr);
-            }
-        } else if self.is_expr_constant(right) {
-            // Right expression is constant, add left as constant
-            if !const_exprs_contains(&self.constants, left) {
-                let const_expr = ConstExpr::from(left)
-                    
.with_across_partitions(self.get_expr_constant_value(right));
-                self.constants.push(const_expr);
-            }
+        // Add equal expressions to the state:
+        if self.eq_group.add_equal_conditions(Arc::clone(&left), right) {
+            // Renormalize orderings if the equivalence group changes:
+            let normal_cls = mem::take(&mut self.oeq_cache.normal_cls);
+            let normal_orderings = normal_cls
+                .into_iter()
+                .map(|o| self.eq_group.normalize_sort_exprs(o));
+            self.oeq_cache.normal_cls = 
OrderingEquivalenceClass::new(normal_orderings);
+            self.oeq_cache.update_map();
+            // Discover any new orderings:
+            self.discover_new_orderings(left)?;
         }
-
-        // Add equal expressions to the state
-        self.eq_group.add_equal_conditions(left, right);
-
-        // Discover any new orderings
-        self.discover_new_orderings(left)?;
         Ok(())
     }
 
     /// Track/register physical expressions with constant values.
-    #[deprecated(since = "43.0.0", note = "Use [`with_constants`] instead")]
-    pub fn add_constants(self, constants: impl IntoIterator<Item = ConstExpr>) 
-> Self {
-        self.with_constants(constants)
-    }
-
-    /// Remove the specified constant
-    pub fn remove_constant(mut self, c: &ConstExpr) -> Self {
-        self.constants.retain(|existing| existing != c);
-        self
-    }
-
-    /// Track/register physical expressions with constant values.
-    pub fn with_constants(
-        mut self,
+    pub fn add_constants(
+        &mut self,
         constants: impl IntoIterator<Item = ConstExpr>,
-    ) -> Self {
-        let normalized_constants = constants
-            .into_iter()
-            .filter_map(|c| {
-                let across_partitions = c.across_partitions();
-                let expr = c.owned_expr();
-                let normalized_expr = self.eq_group.normalize_expr(expr);
-
-                if const_exprs_contains(&self.constants, &normalized_expr) {
-                    return None;
-                }
-
-                let const_expr = ConstExpr::from(normalized_expr)
-                    .with_across_partitions(across_partitions);
-
-                Some(const_expr)
+    ) -> Result<()> {
+        // Add the new constant to the equivalence group:
+        for constant in constants {
+            self.eq_group.add_constant(constant);
+        }
+        // Renormalize the orderings after adding new constants by removing
+        // the constants from existing orderings:
+        let normal_cls = mem::take(&mut self.oeq_cache.normal_cls);
+        let normal_orderings = normal_cls.into_iter().map(|ordering| {
+            ordering.into_iter().filter(|sort_expr| {
+                self.eq_group.is_expr_constant(&sort_expr.expr).is_none()
             })
-            .collect::<Vec<_>>();
-
-        // Add all new normalized constants
-        self.constants.extend(normalized_constants);
-
-        // Discover any new orderings based on the constants
-        for ordering in self.normalized_oeq_class().iter() {
-            if let Err(e) = self.discover_new_orderings(&ordering[0].expr) {
-                log::debug!("error discovering new orderings: {e}");
-            }
+        });
+        self.oeq_cache.normal_cls = 
OrderingEquivalenceClass::new(normal_orderings);
+        self.oeq_cache.update_map();
+        // Discover any new orderings based on the constants:
+        let leading_exprs: Vec<_> = 
self.oeq_cache.leading_map.keys().cloned().collect();
+        for expr in leading_exprs {
+            self.discover_new_orderings(expr)?;
         }
-
-        self
+        Ok(())
     }
 
-    // Discover new valid orderings in light of a new equality.
-    // Accepts a single argument (`expr`) which is used to determine
-    // which orderings should be updated.
-    // When constants or equivalence classes are changed, there may be new 
orderings
-    // that can be discovered with the new equivalence properties.
-    // For a discussion, see: https://github.com/apache/datafusion/issues/9812
-    fn discover_new_orderings(&mut self, expr: &Arc<dyn PhysicalExpr>) -> 
Result<()> {
-        let normalized_expr = self.eq_group().normalize_expr(Arc::clone(expr));
+    /// Discover new valid orderings in light of a new equality. Accepts a 
single
+    /// argument (`expr`) which is used to determine the orderings to update.
+    /// When constants or equivalence classes change, there may be new 
orderings
+    /// that can be discovered with the new equivalence properties.
+    /// For a discussion, see: 
<https://github.com/apache/datafusion/issues/9812>
+    fn discover_new_orderings(
+        &mut self,
+        normal_expr: Arc<dyn PhysicalExpr>,
+    ) -> Result<()> {
+        let Some(ordering_idxs) = self.oeq_cache.leading_map.get(&normal_expr) 
else {
+            return Ok(());
+        };
         let eq_class = self
             .eq_group
-            .iter()
-            .find_map(|class| {
-                class
-                    .contains(&normalized_expr)
-                    .then(|| class.clone().into_vec())
-            })
-            .unwrap_or_else(|| vec![Arc::clone(&normalized_expr)]);
-
-        let mut new_orderings: Vec<LexOrdering> = vec![];
-        for ordering in self.normalized_oeq_class().iter() {
-            if !ordering[0].expr.eq(&normalized_expr) {
-                continue;
-            }
+            .get_equivalence_class(&normal_expr)
+            .map_or_else(|| vec![normal_expr], |class| class.clone().into());
 
+        let mut new_orderings = vec![];
+        for idx in ordering_idxs {
+            let ordering = &self.oeq_cache.normal_cls[*idx];
             let leading_ordering_options = ordering[0].options;
 
-            for equivalent_expr in &eq_class {
+            'exprs: for equivalent_expr in &eq_class {
                 let children = equivalent_expr.children();
                 if children.is_empty() {
                     continue;
                 }
-
-                // Check if all children match the next expressions in the 
ordering
-                let mut all_children_match = true;
+                // Check if all children match the next expressions in the 
ordering:
                 let mut child_properties = vec![];
-
-                // Build properties for each child based on the next 
expressions
-                for (i, child) in children.iter().enumerate() {
-                    if let Some(next) = ordering.get(i + 1) {
-                        if !child.as_ref().eq(next.expr.as_ref()) {
-                            all_children_match = false;
-                            break;
-                        }
-                        child_properties.push(ExprProperties {
-                            sort_properties: 
SortProperties::Ordered(next.options),
-                            range: Interval::make_unbounded(
-                                &child.data_type(&self.schema)?,
-                            )?,
-                            preserves_lex_ordering: true,
-                        });
-                    } else {
-                        all_children_match = false;
-                        break;
+                // Build properties for each child based on the next 
expression:
+                for (i, child) in children.into_iter().enumerate() {
+                    let Some(next) = ordering.get(i + 1) else {
+                        break 'exprs;
+                    };
+                    if !next.expr.eq(child) {
+                        break 'exprs;
                     }
+                    let data_type = child.data_type(&self.schema)?;
+                    child_properties.push(ExprProperties {
+                        sort_properties: SortProperties::Ordered(next.options),
+                        range: Interval::make_unbounded(&data_type)?,
+                        preserves_lex_ordering: true,
+                    });
                 }
-
-                if all_children_match {
-                    // Check if the expression is monotonic in all arguments
-                    if let Ok(expr_properties) =
-                        equivalent_expr.get_properties(&child_properties)
-                    {
-                        if expr_properties.preserves_lex_ordering
-                            && 
SortProperties::Ordered(leading_ordering_options)
-                                == expr_properties.sort_properties
-                        {
-                            // Assume existing ordering is [c ASC, a ASC, b 
ASC]
-                            // When equality c = f(a,b) is given, if we know 
that given ordering `[a ASC, b ASC]`,
-                            // ordering `[f(a,b) ASC]` is valid, then we can 
deduce that ordering `[a ASC, b ASC]` is also valid.
-                            // Hence, ordering `[a ASC, b ASC]` can be added 
to the state as a valid ordering.
-                            // (e.g. existing ordering where leading ordering 
is removed)
-                            
new_orderings.push(LexOrdering::new(ordering[1..].to_vec()));
-                            break;
-                        }
-                    }
+                // Check if the expression is monotonic in all arguments:
+                let expr_properties =
+                    equivalent_expr.get_properties(&child_properties)?;
+                if expr_properties.preserves_lex_ordering
+                    && expr_properties.sort_properties
+                        == SortProperties::Ordered(leading_ordering_options)
+                {
+                    // Assume that `[c ASC, a ASC, b ASC]` is among existing
+                    // orderings. If equality `c = f(a, b)` is given, ordering
+                    // `[a ASC, b ASC]` implies the ordering `[c ASC]`. Thus,
+                    // ordering `[a ASC, b ASC]` is also a valid ordering.

Review Comment:
   I agree that the example only holds when `f` preserves the lexographic 
ordering of a,b. In fact that I think the check for 
`expr_properties.preserves_lex_ordering`  is doing on line 463 above



##########
datafusion/physical-expr/src/equivalence/properties/mod.rs:
##########
@@ -579,302 +611,289 @@ impl EquivalenceProperties {
             // From the analysis above, we know that `[a ASC]` is satisfied. 
Then,
             // we add column `a` as constant to the algorithm state. This 
enables us
             // to deduce that `(b + c) ASC` is satisfied, given `a` is 
constant.
-            eq_properties = eq_properties.with_constants(std::iter::once(
-                ConstExpr::from(Arc::clone(&normalized_req.expr)),
-            ));
+            let const_expr = ConstExpr::from(element.expr);
+            eq_properties.add_constants(std::iter::once(const_expr))?;
         }
+        Ok(true)
+    }
 
-        // All requirements are satisfied.
-        normalized_reqs.len()
+    /// Returns the number of consecutive sort expressions (starting from the
+    /// left) that are satisfied by the existing ordering.
+    fn common_sort_prefix_length(&self, normal_ordering: LexOrdering) -> 
Result<usize> {
+        let full_length = normal_ordering.len();
+        // Check whether the given ordering is satisfied by constraints:
+        if self.satisfied_by_constraints_ordering(&normal_ordering) {
+            // If constraints satisfy all sort expressions, return the full
+            // length:
+            return Ok(full_length);
+        }
+        let schema = self.schema();
+        let mut eq_properties = self.clone();

Review Comment:
   I think you can avoid this clone by using `Cow` and only clone it if it 
needs to be updated
   ```diff
   @@ -1157,7 +1158,8 @@ impl EquivalenceProperties {
            &self,
            exprs: &[Arc<dyn PhysicalExpr>],
        ) -> Result<(Vec<PhysicalSortExpr>, Vec<usize>)> {
   -        let mut eq_properties = self.clone();
   +        // Avoid cloing `self` if not necessary:
   +        let mut eq_properties = Cow::Borrowed(self);
            let mut result = vec![];
            // The algorithm is as follows:
            // - Iterate over all the expressions and insert ordered expressions
   @@ -1204,7 +1206,7 @@ impl EquivalenceProperties {
                // an implementation strategy confined to this function.
                for (PhysicalSortExpr { expr, .. }, idx) in &ordered_exprs {
                    let const_expr = ConstExpr::from(Arc::clone(expr));
   -                eq_properties.add_constants(std::iter::once(const_expr))?;
   +                
eq_properties.to_mut().add_constants(std::iter::once(const_expr))?;
                    search_indices.shift_remove(idx);
                }
                // Add new ordered section to the state.
   ```



##########
datafusion/physical-expr/src/equivalence/properties/mod.rs:
##########
@@ -579,302 +611,289 @@ impl EquivalenceProperties {
             // From the analysis above, we know that `[a ASC]` is satisfied. 
Then,
             // we add column `a` as constant to the algorithm state. This 
enables us
             // to deduce that `(b + c) ASC` is satisfied, given `a` is 
constant.
-            eq_properties = eq_properties.with_constants(std::iter::once(
-                ConstExpr::from(Arc::clone(&normalized_req.expr)),
-            ));
+            let const_expr = ConstExpr::from(element.expr);
+            eq_properties.add_constants(std::iter::once(const_expr))?;
         }
+        Ok(true)
+    }
 
-        // All requirements are satisfied.
-        normalized_reqs.len()
+    /// Returns the number of consecutive sort expressions (starting from the
+    /// left) that are satisfied by the existing ordering.
+    fn common_sort_prefix_length(&self, normal_ordering: LexOrdering) -> 
Result<usize> {
+        let full_length = normal_ordering.len();
+        // Check whether the given ordering is satisfied by constraints:
+        if self.satisfied_by_constraints_ordering(&normal_ordering) {
+            // If constraints satisfy all sort expressions, return the full
+            // length:
+            return Ok(full_length);
+        }
+        let schema = self.schema();
+        let mut eq_properties = self.clone();
+        for (idx, element) in normal_ordering.into_iter().enumerate() {
+            // Check whether given ordering is satisfied:
+            let ExprProperties {
+                sort_properties, ..
+            } = eq_properties.get_expr_properties(Arc::clone(&element.expr));
+            let satisfy = match sort_properties {
+                SortProperties::Ordered(options) => options_compatible(
+                    &options,
+                    &element.options,
+                    element.expr.nullable(schema).unwrap_or(true),
+                ),
+                // Singleton expressions satisfy any ordering.
+                SortProperties::Singleton => true,
+                SortProperties::Unordered => false,
+            };
+            if !satisfy {
+                // As soon as one sort expression is unsatisfied, return how
+                // many we've satisfied so far:
+                return Ok(idx);
+            }
+            // Treat satisfied keys as constants in subsequent iterations. We
+            // can do this because the "next" key only matters in a 
lexicographical
+            // ordering when the keys to its left have the same values.
+            //
+            // Note that these expressions are not properly "constants". This 
is just
+            // an implementation strategy confined to this function.
+            //
+            // For example, assume that the requirement is `[a ASC, (b + c) 
ASC]`,
+            // and existing equivalent orderings are `[a ASC, b ASC]` and `[c 
ASC]`.
+            // From the analysis above, we know that `[a ASC]` is satisfied. 
Then,
+            // we add column `a` as constant to the algorithm state. This 
enables us
+            // to deduce that `(b + c) ASC` is satisfied, given `a` is 
constant.
+            let const_expr = ConstExpr::from(element.expr);
+            eq_properties.add_constants(std::iter::once(const_expr))?
+        }
+        // All sort expressions are satisfied, return full length:
+        Ok(full_length)
     }
 
-    /// Determines the longest prefix of `reqs` that is satisfied by the 
existing ordering.
-    /// Returns that prefix as a new `LexRequirement`, and a boolean 
indicating if all the requirements are satisfied.
+    /// Determines the longest normal prefix of `ordering` satisfied by the
+    /// existing ordering. Returns that prefix as a new `LexOrdering`, and a
+    /// boolean indicating whether all the sort expressions are satisfied.
     pub fn extract_common_sort_prefix(
         &self,
-        reqs: &LexRequirement,
-    ) -> (LexRequirement, bool) {
-        // First, standardize the given requirement:
-        let normalized_reqs = self.normalize_sort_requirements(reqs);
-
-        let prefix_len = 
self.compute_common_sort_prefix_length(&normalized_reqs);
-        (
-            LexRequirement::new(normalized_reqs[..prefix_len].to_vec()),
-            prefix_len == normalized_reqs.len(),
-        )
-    }
-
-    /// Checks whether the given sort requirements are satisfied by any of the
-    /// existing orderings.
-    pub fn ordering_satisfy_requirement(&self, reqs: &LexRequirement) -> bool {
-        self.extract_common_sort_prefix(reqs).1
+        ordering: LexOrdering,
+    ) -> Result<(Vec<PhysicalSortExpr>, bool)> {
+        // First, standardize the given ordering:
+        let Some(normal_ordering) = self.normalize_sort_exprs(ordering) else {
+            // If the ordering vanishes after normalization, it is satisfied:
+            return Ok((vec![], true));
+        };
+        let prefix_len = 
self.common_sort_prefix_length(normal_ordering.clone())?;
+        let flag = prefix_len == normal_ordering.len();
+        let mut sort_exprs: Vec<_> = normal_ordering.into();
+        if !flag {
+            sort_exprs.truncate(prefix_len);
+        }
+        Ok((sort_exprs, flag))
     }
 
-    /// Checks if the sort requirements are satisfied by any of the table 
constraints (primary key or unique).
-    /// Returns true if any constraint fully satisfies the requirements.
-    fn satisfied_by_constraints(
+    /// Checks if the sort expressions are satisfied by any of the table
+    /// constraints (primary key or unique). Returns true if any constraint
+    /// fully satisfies the expressions (i.e. constraint indices form a valid
+    /// prefix of an existing ordering that matches the expressions). For
+    /// unique constraints, also verifies nullable columns.
+    fn satisfied_by_constraints_ordering(
         &self,
-        normalized_reqs: &[PhysicalSortRequirement],
+        normal_exprs: &[PhysicalSortExpr],
     ) -> bool {
         self.constraints.iter().any(|constraint| match constraint {
-            Constraint::PrimaryKey(indices) | Constraint::Unique(indices) => 
self
-                .satisfied_by_constraint(
-                    normalized_reqs,
-                    indices,
-                    matches!(constraint, Constraint::Unique(_)),
-                ),
-        })
-    }
-
-    /// Checks if sort requirements are satisfied by a constraint (primary key 
or unique).
-    /// Returns true if the constraint indices form a valid prefix of an 
existing ordering
-    /// that matches the requirements. For unique constraints, also verifies 
nullable columns.
-    fn satisfied_by_constraint(
-        &self,
-        normalized_reqs: &[PhysicalSortRequirement],
-        indices: &[usize],
-        check_null: bool,
-    ) -> bool {
-        // Requirements must contain indices
-        if indices.len() > normalized_reqs.len() {
-            return false;
-        }
-
-        // Iterate over all orderings
-        self.oeq_class.iter().any(|ordering| {
-            if indices.len() > ordering.len() {
-                return false;
-            }
-
-            // Build a map of column positions in the ordering
-            let mut col_positions = HashMap::with_capacity(ordering.len());
-            for (pos, req) in ordering.iter().enumerate() {
-                if let Some(col) = req.expr.as_any().downcast_ref::<Column>() {
-                    col_positions.insert(
-                        col.index(),
-                        (pos, col.nullable(&self.schema).unwrap_or(true)),
-                    );
-                }
-            }
-
-            // Check if all constraint indices appear in valid positions
-            if !indices.iter().all(|&idx| {
-                col_positions
-                    .get(&idx)
-                    .map(|&(pos, nullable)| {
-                        // For unique constraints, verify column is not 
nullable if it's first/last
-                        !check_null
-                            || (pos != 0 && pos != ordering.len() - 1)
-                            || !nullable
+            Constraint::PrimaryKey(indices) | Constraint::Unique(indices) => {
+                let check_null = matches!(constraint, Constraint::Unique(_));
+                let normalized_size = normal_exprs.len();
+                indices.len() <= normalized_size
+                    && self.oeq_class.iter().any(|ordering| {
+                        let length = ordering.len();
+                        if indices.len() > length || normalized_size < length {
+                            return false;
+                        }
+                        // Build a map of column positions in the ordering:
+                        let mut col_positions = HashMap::with_capacity(length);
+                        for (pos, req) in ordering.iter().enumerate() {
+                            if let Some(col) = 
req.expr.as_any().downcast_ref::<Column>()
+                            {
+                                let nullable = 
col.nullable(&self.schema).unwrap_or(true);
+                                col_positions.insert(col.index(), (pos, 
nullable));
+                            }
+                        }
+                        // Check if all constraint indices appear in valid 
positions:
+                        if !indices.iter().all(|idx| {
+                            col_positions.get(idx).is_some_and(|&(pos, 
nullable)| {
+                                // For unique constraints, verify column is 
not nullable if it's first/last:
+                                !check_null
+                                    || !nullable
+                                    || (pos != 0 && pos != length - 1)
+                            })
+                        }) {
+                            return false;
+                        }
+                        // Check if this ordering matches the prefix:
+                        normal_exprs.iter().zip(ordering).all(|(given, 
existing)| {
+                            existing.satisfy_expr(given, &self.schema)
+                        })
                     })
-                    .unwrap_or(false)
-            }) {
-                return false;
             }
-
-            // Check if this ordering matches requirements prefix
-            let ordering_len = ordering.len();
-            normalized_reqs.len() >= ordering_len
-                && normalized_reqs[..ordering_len].iter().zip(ordering).all(
-                    |(req, existing)| {
-                        req.expr.eq(&existing.expr)
-                            && req
-                                .options
-                                .is_none_or(|req_opts| req_opts == 
existing.options)
-                    },
-                )
         })
     }
 
-    /// Determines whether the ordering specified by the given sort requirement
-    /// is satisfied based on the orderings within, equivalence classes, and
-    /// constant expressions.
-    ///
-    /// # Parameters
-    ///
-    /// - `req`: A reference to a `PhysicalSortRequirement` for which the 
ordering
-    ///   satisfaction check will be done.
-    ///
-    /// # Returns
-    ///
-    /// Returns `true` if the specified ordering is satisfied, `false` 
otherwise.
-    fn ordering_satisfy_single(&self, req: &PhysicalSortRequirement) -> bool {
-        let ExprProperties {
-            sort_properties, ..
-        } = self.get_expr_properties(Arc::clone(&req.expr));
-        match sort_properties {
-            SortProperties::Ordered(options) => {
-                let sort_expr = PhysicalSortExpr {
-                    expr: Arc::clone(&req.expr),
-                    options,
-                };
-                sort_expr.satisfy(req, self.schema())
+    /// Checks if the sort requirements are satisfied by any of the table
+    /// constraints (primary key or unique). Returns true if any constraint
+    /// fully satisfies the requirements (i.e. constraint indices form a valid
+    /// prefix of an existing ordering that matches the requirements). For
+    /// unique constraints, also verifies nullable columns.
+    fn satisfied_by_constraints(&self, normal_reqs: 
&[PhysicalSortRequirement]) -> bool {
+        self.constraints.iter().any(|constraint| match constraint {
+            Constraint::PrimaryKey(indices) | Constraint::Unique(indices) => {
+                let check_null = matches!(constraint, Constraint::Unique(_));
+                let normalized_size = normal_reqs.len();
+                indices.len() <= normalized_size
+                    && self.oeq_class.iter().any(|ordering| {
+                        let length = ordering.len();
+                        if indices.len() > length || normalized_size < length {
+                            return false;
+                        }
+                        // Build a map of column positions in the ordering:
+                        let mut col_positions = HashMap::with_capacity(length);
+                        for (pos, req) in ordering.iter().enumerate() {
+                            if let Some(col) = 
req.expr.as_any().downcast_ref::<Column>()
+                            {
+                                let nullable = 
col.nullable(&self.schema).unwrap_or(true);
+                                col_positions.insert(col.index(), (pos, 
nullable));
+                            }
+                        }
+                        // Check if all constraint indices appear in valid 
positions:
+                        if !indices.iter().all(|idx| {
+                            col_positions.get(idx).is_some_and(|&(pos, 
nullable)| {
+                                // For unique constraints, verify column is 
not nullable if it's first/last:
+                                !check_null
+                                    || !nullable
+                                    || (pos != 0 && pos != length - 1)
+                            })
+                        }) {
+                            return false;
+                        }
+                        // Check if this ordering matches the prefix:
+                        normal_reqs.iter().zip(ordering).all(|(given, 
existing)| {
+                            existing.satisfy(given, &self.schema)
+                        })
+                    })
             }
-            // Singleton expressions satisfies any ordering.
-            SortProperties::Singleton => true,
-            SortProperties::Unordered => false,
-        }
+        })
     }
 
     /// Checks whether the `given` sort requirements are equal or more specific
     /// than the `reference` sort requirements.
     pub fn requirements_compatible(
         &self,
-        given: &LexRequirement,
-        reference: &LexRequirement,
+        given: LexRequirement,
+        reference: LexRequirement,
     ) -> bool {
-        let normalized_given = self.normalize_sort_requirements(given);
-        let normalized_reference = self.normalize_sort_requirements(reference);
-
-        (normalized_reference.len() <= normalized_given.len())
-            && normalized_reference
+        let Some(normal_given) = self.normalize_sort_requirements(given) else {
+            return true;
+        };
+        let Some(normal_reference) = 
self.normalize_sort_requirements(reference) else {
+            return true;
+        };
+
+        (normal_reference.len() <= normal_given.len())
+            && normal_reference
                 .into_iter()
-                .zip(normalized_given)
+                .zip(normal_given)
                 .all(|(reference, given)| given.compatible(&reference))
     }
 
-    /// Returns the finer ordering among the orderings `lhs` and `rhs`, 
breaking
-    /// any ties by choosing `lhs`.
-    ///
-    /// The finer ordering is the ordering that satisfies both of the 
orderings.
-    /// If the orderings are incomparable, returns `None`.
-    ///
-    /// For example, the finer ordering among `[a ASC]` and `[a ASC, b ASC]` is
-    /// the latter.
-    pub fn get_finer_ordering(
-        &self,
-        lhs: &LexOrdering,
-        rhs: &LexOrdering,
-    ) -> Option<LexOrdering> {
-        // Convert the given sort expressions to sort requirements:
-        let lhs = LexRequirement::from(lhs.clone());
-        let rhs = LexRequirement::from(rhs.clone());
-        let finer = self.get_finer_requirement(&lhs, &rhs);
-        // Convert the chosen sort requirements back to sort expressions:
-        finer.map(LexOrdering::from)
-    }
-
-    /// Returns the finer ordering among the requirements `lhs` and `rhs`,
-    /// breaking any ties by choosing `lhs`.
+    /// Modify existing orderings by substituting sort expressions with 
appropriate
+    /// targets from the projection mapping. We substitute a sort expression 
when
+    /// its physical expression has a one-to-one functional relationship with a
+    /// target expression in the mapping.
     ///
-    /// The finer requirements are the ones that satisfy both of the given
-    /// requirements. If the requirements are incomparable, returns `None`.
+    /// After substitution, we may generate more than one `LexOrdering` for 
each
+    /// existing equivalent ordering. For example, `[a ASC, b ASC]` will turn
+    /// into `[CAST(a) ASC, b ASC]` and `[a ASC, b ASC]` when applying 
projection
+    /// expressions `a, b, CAST(a)`.
     ///
-    /// For example, the finer requirements among `[a ASC]` and `[a ASC, b 
ASC]`
-    /// is the latter.
-    pub fn get_finer_requirement(
-        &self,
-        req1: &LexRequirement,
-        req2: &LexRequirement,
-    ) -> Option<LexRequirement> {
-        let mut lhs = self.normalize_sort_requirements(req1);
-        let mut rhs = self.normalize_sort_requirements(req2);
-        lhs.inner
-            .iter_mut()
-            .zip(rhs.inner.iter_mut())
-            .all(|(lhs, rhs)| {
-                lhs.expr.eq(&rhs.expr)
-                    && match (lhs.options, rhs.options) {
-                        (Some(lhs_opt), Some(rhs_opt)) => lhs_opt == rhs_opt,
-                        (Some(options), None) => {
-                            rhs.options = Some(options);
-                            true
-                        }
-                        (None, Some(options)) => {
-                            lhs.options = Some(options);
-                            true
-                        }
-                        (None, None) => true,
-                    }
-            })
-            .then_some(if lhs.len() >= rhs.len() { lhs } else { rhs })
-    }
-
-    /// we substitute the ordering according to input expression type, this is 
a simplified version
-    /// In this case, we just substitute when the expression satisfy the 
following condition:
-    /// I. just have one column and is a CAST expression
-    /// TODO: Add one-to-ones analysis for monotonic ScalarFunctions.
-    /// TODO: we could precompute all the scenario that is computable, for 
example: atan(x + 1000) should also be substituted if
-    ///  x is DESC or ASC
-    /// After substitution, we may generate more than 1 `LexOrdering`. As an 
example,
-    /// `[a ASC, b ASC]` will turn into `[a ASC, b ASC], [CAST(a) ASC, b ASC]` 
when projection expressions `a, b, CAST(a)` is applied.
-    pub fn substitute_ordering_component(
-        &self,
+    /// TODO: Handle all scenarios that allow substitution; e.g. when `x` is

Review Comment:
   I recommend we file a ticket to track this work and add a link to the ticket 
in the comments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org


Reply via email to