ozankabak commented on code in PR #16217:
URL: https://github.com/apache/datafusion/pull/16217#discussion_r2127000873


##########
datafusion/physical-expr/src/equivalence/properties/mod.rs:
##########
@@ -579,302 +611,289 @@ impl EquivalenceProperties {
             // From the analysis above, we know that `[a ASC]` is satisfied. 
Then,
             // we add column `a` as constant to the algorithm state. This 
enables us
             // to deduce that `(b + c) ASC` is satisfied, given `a` is 
constant.
-            eq_properties = eq_properties.with_constants(std::iter::once(
-                ConstExpr::from(Arc::clone(&normalized_req.expr)),
-            ));
+            let const_expr = ConstExpr::from(element.expr);
+            eq_properties.add_constants(std::iter::once(const_expr))?;
         }
+        Ok(true)
+    }
 
-        // All requirements are satisfied.
-        normalized_reqs.len()
+    /// Returns the number of consecutive sort expressions (starting from the
+    /// left) that are satisfied by the existing ordering.
+    fn common_sort_prefix_length(&self, normal_ordering: LexOrdering) -> 
Result<usize> {
+        let full_length = normal_ordering.len();
+        // Check whether the given ordering is satisfied by constraints:
+        if self.satisfied_by_constraints_ordering(&normal_ordering) {
+            // If constraints satisfy all sort expressions, return the full
+            // length:
+            return Ok(full_length);
+        }
+        let schema = self.schema();
+        let mut eq_properties = self.clone();
+        for (idx, element) in normal_ordering.into_iter().enumerate() {
+            // Check whether given ordering is satisfied:
+            let ExprProperties {
+                sort_properties, ..
+            } = eq_properties.get_expr_properties(Arc::clone(&element.expr));
+            let satisfy = match sort_properties {
+                SortProperties::Ordered(options) => options_compatible(
+                    &options,
+                    &element.options,
+                    element.expr.nullable(schema).unwrap_or(true),
+                ),
+                // Singleton expressions satisfy any ordering.
+                SortProperties::Singleton => true,
+                SortProperties::Unordered => false,
+            };
+            if !satisfy {
+                // As soon as one sort expression is unsatisfied, return how
+                // many we've satisfied so far:
+                return Ok(idx);
+            }
+            // Treat satisfied keys as constants in subsequent iterations. We
+            // can do this because the "next" key only matters in a 
lexicographical
+            // ordering when the keys to its left have the same values.
+            //
+            // Note that these expressions are not properly "constants". This 
is just
+            // an implementation strategy confined to this function.
+            //
+            // For example, assume that the requirement is `[a ASC, (b + c) 
ASC]`,
+            // and existing equivalent orderings are `[a ASC, b ASC]` and `[c 
ASC]`.
+            // From the analysis above, we know that `[a ASC]` is satisfied. 
Then,
+            // we add column `a` as constant to the algorithm state. This 
enables us
+            // to deduce that `(b + c) ASC` is satisfied, given `a` is 
constant.
+            let const_expr = ConstExpr::from(element.expr);
+            eq_properties.add_constants(std::iter::once(const_expr))?
+        }
+        // All sort expressions are satisfied, return full length:
+        Ok(full_length)
     }
 
-    /// Determines the longest prefix of `reqs` that is satisfied by the 
existing ordering.
-    /// Returns that prefix as a new `LexRequirement`, and a boolean 
indicating if all the requirements are satisfied.
+    /// Determines the longest normal prefix of `ordering` satisfied by the
+    /// existing ordering. Returns that prefix as a new `LexOrdering`, and a
+    /// boolean indicating whether all the sort expressions are satisfied.
     pub fn extract_common_sort_prefix(
         &self,
-        reqs: &LexRequirement,
-    ) -> (LexRequirement, bool) {
-        // First, standardize the given requirement:
-        let normalized_reqs = self.normalize_sort_requirements(reqs);
-
-        let prefix_len = 
self.compute_common_sort_prefix_length(&normalized_reqs);
-        (
-            LexRequirement::new(normalized_reqs[..prefix_len].to_vec()),
-            prefix_len == normalized_reqs.len(),
-        )
-    }
-
-    /// Checks whether the given sort requirements are satisfied by any of the
-    /// existing orderings.
-    pub fn ordering_satisfy_requirement(&self, reqs: &LexRequirement) -> bool {
-        self.extract_common_sort_prefix(reqs).1
+        ordering: LexOrdering,
+    ) -> Result<(Vec<PhysicalSortExpr>, bool)> {
+        // First, standardize the given ordering:
+        let Some(normal_ordering) = self.normalize_sort_exprs(ordering) else {
+            // If the ordering vanishes after normalization, it is satisfied:
+            return Ok((vec![], true));
+        };
+        let prefix_len = 
self.common_sort_prefix_length(normal_ordering.clone())?;
+        let flag = prefix_len == normal_ordering.len();
+        let mut sort_exprs: Vec<_> = normal_ordering.into();
+        if !flag {
+            sort_exprs.truncate(prefix_len);
+        }
+        Ok((sort_exprs, flag))
     }
 
-    /// Checks if the sort requirements are satisfied by any of the table 
constraints (primary key or unique).
-    /// Returns true if any constraint fully satisfies the requirements.
-    fn satisfied_by_constraints(
+    /// Checks if the sort expressions are satisfied by any of the table
+    /// constraints (primary key or unique). Returns true if any constraint
+    /// fully satisfies the expressions (i.e. constraint indices form a valid
+    /// prefix of an existing ordering that matches the expressions). For
+    /// unique constraints, also verifies nullable columns.
+    fn satisfied_by_constraints_ordering(
         &self,
-        normalized_reqs: &[PhysicalSortRequirement],
+        normal_exprs: &[PhysicalSortExpr],
     ) -> bool {
         self.constraints.iter().any(|constraint| match constraint {
-            Constraint::PrimaryKey(indices) | Constraint::Unique(indices) => 
self
-                .satisfied_by_constraint(
-                    normalized_reqs,
-                    indices,
-                    matches!(constraint, Constraint::Unique(_)),
-                ),
-        })
-    }
-
-    /// Checks if sort requirements are satisfied by a constraint (primary key 
or unique).
-    /// Returns true if the constraint indices form a valid prefix of an 
existing ordering
-    /// that matches the requirements. For unique constraints, also verifies 
nullable columns.
-    fn satisfied_by_constraint(
-        &self,
-        normalized_reqs: &[PhysicalSortRequirement],
-        indices: &[usize],
-        check_null: bool,
-    ) -> bool {
-        // Requirements must contain indices
-        if indices.len() > normalized_reqs.len() {
-            return false;
-        }
-
-        // Iterate over all orderings
-        self.oeq_class.iter().any(|ordering| {
-            if indices.len() > ordering.len() {
-                return false;
-            }
-
-            // Build a map of column positions in the ordering
-            let mut col_positions = HashMap::with_capacity(ordering.len());
-            for (pos, req) in ordering.iter().enumerate() {
-                if let Some(col) = req.expr.as_any().downcast_ref::<Column>() {
-                    col_positions.insert(
-                        col.index(),
-                        (pos, col.nullable(&self.schema).unwrap_or(true)),
-                    );
-                }
-            }
-
-            // Check if all constraint indices appear in valid positions
-            if !indices.iter().all(|&idx| {
-                col_positions
-                    .get(&idx)
-                    .map(|&(pos, nullable)| {
-                        // For unique constraints, verify column is not 
nullable if it's first/last
-                        !check_null
-                            || (pos != 0 && pos != ordering.len() - 1)
-                            || !nullable
+            Constraint::PrimaryKey(indices) | Constraint::Unique(indices) => {
+                let check_null = matches!(constraint, Constraint::Unique(_));
+                let normalized_size = normal_exprs.len();
+                indices.len() <= normalized_size
+                    && self.oeq_class.iter().any(|ordering| {
+                        let length = ordering.len();
+                        if indices.len() > length || normalized_size < length {
+                            return false;
+                        }
+                        // Build a map of column positions in the ordering:
+                        let mut col_positions = HashMap::with_capacity(length);
+                        for (pos, req) in ordering.iter().enumerate() {
+                            if let Some(col) = 
req.expr.as_any().downcast_ref::<Column>()
+                            {
+                                let nullable = 
col.nullable(&self.schema).unwrap_or(true);
+                                col_positions.insert(col.index(), (pos, 
nullable));
+                            }
+                        }
+                        // Check if all constraint indices appear in valid 
positions:
+                        if !indices.iter().all(|idx| {
+                            col_positions.get(idx).is_some_and(|&(pos, 
nullable)| {
+                                // For unique constraints, verify column is 
not nullable if it's first/last:
+                                !check_null
+                                    || !nullable
+                                    || (pos != 0 && pos != length - 1)
+                            })
+                        }) {
+                            return false;
+                        }
+                        // Check if this ordering matches the prefix:
+                        normal_exprs.iter().zip(ordering).all(|(given, 
existing)| {
+                            existing.satisfy_expr(given, &self.schema)
+                        })
                     })
-                    .unwrap_or(false)
-            }) {
-                return false;
             }
-
-            // Check if this ordering matches requirements prefix
-            let ordering_len = ordering.len();
-            normalized_reqs.len() >= ordering_len
-                && normalized_reqs[..ordering_len].iter().zip(ordering).all(
-                    |(req, existing)| {
-                        req.expr.eq(&existing.expr)
-                            && req
-                                .options
-                                .is_none_or(|req_opts| req_opts == 
existing.options)
-                    },
-                )
         })
     }
 
-    /// Determines whether the ordering specified by the given sort requirement
-    /// is satisfied based on the orderings within, equivalence classes, and
-    /// constant expressions.
-    ///
-    /// # Parameters
-    ///
-    /// - `req`: A reference to a `PhysicalSortRequirement` for which the 
ordering
-    ///   satisfaction check will be done.
-    ///
-    /// # Returns
-    ///
-    /// Returns `true` if the specified ordering is satisfied, `false` 
otherwise.
-    fn ordering_satisfy_single(&self, req: &PhysicalSortRequirement) -> bool {
-        let ExprProperties {
-            sort_properties, ..
-        } = self.get_expr_properties(Arc::clone(&req.expr));
-        match sort_properties {
-            SortProperties::Ordered(options) => {
-                let sort_expr = PhysicalSortExpr {
-                    expr: Arc::clone(&req.expr),
-                    options,
-                };
-                sort_expr.satisfy(req, self.schema())
+    /// Checks if the sort requirements are satisfied by any of the table
+    /// constraints (primary key or unique). Returns true if any constraint
+    /// fully satisfies the requirements (i.e. constraint indices form a valid
+    /// prefix of an existing ordering that matches the requirements). For
+    /// unique constraints, also verifies nullable columns.
+    fn satisfied_by_constraints(&self, normal_reqs: 
&[PhysicalSortRequirement]) -> bool {
+        self.constraints.iter().any(|constraint| match constraint {
+            Constraint::PrimaryKey(indices) | Constraint::Unique(indices) => {
+                let check_null = matches!(constraint, Constraint::Unique(_));
+                let normalized_size = normal_reqs.len();
+                indices.len() <= normalized_size
+                    && self.oeq_class.iter().any(|ordering| {
+                        let length = ordering.len();
+                        if indices.len() > length || normalized_size < length {
+                            return false;
+                        }
+                        // Build a map of column positions in the ordering:
+                        let mut col_positions = HashMap::with_capacity(length);
+                        for (pos, req) in ordering.iter().enumerate() {
+                            if let Some(col) = 
req.expr.as_any().downcast_ref::<Column>()
+                            {
+                                let nullable = 
col.nullable(&self.schema).unwrap_or(true);
+                                col_positions.insert(col.index(), (pos, 
nullable));
+                            }
+                        }
+                        // Check if all constraint indices appear in valid 
positions:
+                        if !indices.iter().all(|idx| {
+                            col_positions.get(idx).is_some_and(|&(pos, 
nullable)| {
+                                // For unique constraints, verify column is 
not nullable if it's first/last:
+                                !check_null
+                                    || !nullable
+                                    || (pos != 0 && pos != length - 1)
+                            })
+                        }) {
+                            return false;
+                        }
+                        // Check if this ordering matches the prefix:
+                        normal_reqs.iter().zip(ordering).all(|(given, 
existing)| {
+                            existing.satisfy(given, &self.schema)
+                        })
+                    })
             }
-            // Singleton expressions satisfies any ordering.
-            SortProperties::Singleton => true,
-            SortProperties::Unordered => false,
-        }
+        })
     }
 
     /// Checks whether the `given` sort requirements are equal or more specific
     /// than the `reference` sort requirements.
     pub fn requirements_compatible(
         &self,
-        given: &LexRequirement,
-        reference: &LexRequirement,
+        given: LexRequirement,
+        reference: LexRequirement,
     ) -> bool {
-        let normalized_given = self.normalize_sort_requirements(given);
-        let normalized_reference = self.normalize_sort_requirements(reference);
-
-        (normalized_reference.len() <= normalized_given.len())
-            && normalized_reference
+        let Some(normal_given) = self.normalize_sort_requirements(given) else {
+            return true;
+        };
+        let Some(normal_reference) = 
self.normalize_sort_requirements(reference) else {
+            return true;
+        };
+
+        (normal_reference.len() <= normal_given.len())
+            && normal_reference
                 .into_iter()
-                .zip(normalized_given)
+                .zip(normal_given)
                 .all(|(reference, given)| given.compatible(&reference))
     }
 
-    /// Returns the finer ordering among the orderings `lhs` and `rhs`, 
breaking
-    /// any ties by choosing `lhs`.
-    ///
-    /// The finer ordering is the ordering that satisfies both of the 
orderings.
-    /// If the orderings are incomparable, returns `None`.
-    ///
-    /// For example, the finer ordering among `[a ASC]` and `[a ASC, b ASC]` is
-    /// the latter.
-    pub fn get_finer_ordering(
-        &self,
-        lhs: &LexOrdering,
-        rhs: &LexOrdering,
-    ) -> Option<LexOrdering> {
-        // Convert the given sort expressions to sort requirements:
-        let lhs = LexRequirement::from(lhs.clone());
-        let rhs = LexRequirement::from(rhs.clone());
-        let finer = self.get_finer_requirement(&lhs, &rhs);
-        // Convert the chosen sort requirements back to sort expressions:
-        finer.map(LexOrdering::from)
-    }
-
-    /// Returns the finer ordering among the requirements `lhs` and `rhs`,
-    /// breaking any ties by choosing `lhs`.
+    /// Modify existing orderings by substituting sort expressions with 
appropriate
+    /// targets from the projection mapping. We substitute a sort expression 
when
+    /// its physical expression has a one-to-one functional relationship with a
+    /// target expression in the mapping.
     ///
-    /// The finer requirements are the ones that satisfy both of the given
-    /// requirements. If the requirements are incomparable, returns `None`.
+    /// After substitution, we may generate more than one `LexOrdering` for 
each
+    /// existing equivalent ordering. For example, `[a ASC, b ASC]` will turn
+    /// into `[CAST(a) ASC, b ASC]` and `[a ASC, b ASC]` when applying 
projection
+    /// expressions `a, b, CAST(a)`.
     ///
-    /// For example, the finer requirements among `[a ASC]` and `[a ASC, b 
ASC]`
-    /// is the latter.
-    pub fn get_finer_requirement(
-        &self,
-        req1: &LexRequirement,
-        req2: &LexRequirement,
-    ) -> Option<LexRequirement> {
-        let mut lhs = self.normalize_sort_requirements(req1);
-        let mut rhs = self.normalize_sort_requirements(req2);
-        lhs.inner
-            .iter_mut()
-            .zip(rhs.inner.iter_mut())
-            .all(|(lhs, rhs)| {
-                lhs.expr.eq(&rhs.expr)
-                    && match (lhs.options, rhs.options) {
-                        (Some(lhs_opt), Some(rhs_opt)) => lhs_opt == rhs_opt,
-                        (Some(options), None) => {
-                            rhs.options = Some(options);
-                            true
-                        }
-                        (None, Some(options)) => {
-                            lhs.options = Some(options);
-                            true
-                        }
-                        (None, None) => true,
-                    }
-            })
-            .then_some(if lhs.len() >= rhs.len() { lhs } else { rhs })
-    }
-
-    /// we substitute the ordering according to input expression type, this is 
a simplified version
-    /// In this case, we just substitute when the expression satisfy the 
following condition:
-    /// I. just have one column and is a CAST expression
-    /// TODO: Add one-to-ones analysis for monotonic ScalarFunctions.
-    /// TODO: we could precompute all the scenario that is computable, for 
example: atan(x + 1000) should also be substituted if
-    ///  x is DESC or ASC
-    /// After substitution, we may generate more than 1 `LexOrdering`. As an 
example,
-    /// `[a ASC, b ASC]` will turn into `[a ASC, b ASC], [CAST(a) ASC, b ASC]` 
when projection expressions `a, b, CAST(a)` is applied.
-    pub fn substitute_ordering_component(
-        &self,
+    /// TODO: Handle all scenarios that allow substitution; e.g. when `x` is

Review Comment:
   Makes sense, will do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to