viirya commented on code in PR #16217: URL: https://github.com/apache/datafusion/pull/16217#discussion_r2122097178
########## datafusion/physical-expr/src/equivalence/class.rs: ########## @@ -175,307 +135,398 @@ impl ConstExpr { } } +impl PartialEq for ConstExpr { + fn eq(&self, other: &Self) -> bool { + self.across_partitions == other.across_partitions && self.expr.eq(&other.expr) + } +} + impl Display for ConstExpr { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.expr)?; - match &self.across_partitions { - AcrossPartitions::Heterogeneous => { - write!(f, "(heterogeneous)")?; - } - AcrossPartitions::Uniform(value) => { - if let Some(val) = value { - write!(f, "(uniform: {val})")?; - } else { - write!(f, "(uniform: unknown)")?; - } - } - } - Ok(()) + write!(f, "{}", self.across_partitions) } } impl From<Arc<dyn PhysicalExpr>> for ConstExpr { fn from(expr: Arc<dyn PhysicalExpr>) -> Self { - Self::new(expr) - } -} - -impl From<&Arc<dyn PhysicalExpr>> for ConstExpr { - fn from(expr: &Arc<dyn PhysicalExpr>) -> Self { - Self::new(Arc::clone(expr)) + // By default, assume constant expressions are not same across partitions. + // However, if we have a literal, it will have a single value that is the + // same across all partitions. + let across = if let Some(lit) = expr.as_any().downcast_ref::<Literal>() { + AcrossPartitions::Uniform(Some(lit.value().clone())) + } else { + AcrossPartitions::Heterogeneous + }; + Self { + expr, + across_partitions: across, + } } } -/// Checks whether `expr` is among in the `const_exprs`. -pub fn const_exprs_contains( - const_exprs: &[ConstExpr], - expr: &Arc<dyn PhysicalExpr>, -) -> bool { - const_exprs - .iter() - .any(|const_expr| const_expr.expr.eq(expr)) -} - /// An `EquivalenceClass` is a set of [`Arc<dyn PhysicalExpr>`]s that are known /// to have the same value for all tuples in a relation. These are generated by /// equality predicates (e.g. `a = b`), typically equi-join conditions and /// equality conditions in filters. /// /// Two `EquivalenceClass`es are equal if they contains the same expressions in /// without any ordering. -#[derive(Debug, Clone)] +#[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct EquivalenceClass { - /// The expressions in this equivalence class. The order doesn't - /// matter for equivalence purposes - /// - exprs: IndexSet<Arc<dyn PhysicalExpr>>, -} - -impl PartialEq for EquivalenceClass { - /// Returns true if other is equal in the sense - /// of bags (multi-sets), disregarding their orderings. - fn eq(&self, other: &Self) -> bool { - self.exprs.eq(&other.exprs) - } + /// The expressions in this equivalence class. The order doesn't matter for + /// equivalence purposes. + pub(crate) exprs: IndexSet<Arc<dyn PhysicalExpr>>, + /// Indicates whether the expressions in this equivalence class have a + /// constant value. A `Some` value indicates constant-ness. + pub(crate) constant: Option<AcrossPartitions>, } impl EquivalenceClass { - /// Create a new empty equivalence class - pub fn new_empty() -> Self { - Self { - exprs: IndexSet::new(), - } - } - - // Create a new equivalence class from a pre-existing `Vec` - pub fn new(exprs: Vec<Arc<dyn PhysicalExpr>>) -> Self { - Self { - exprs: exprs.into_iter().collect(), + // Create a new equivalence class from a pre-existing collection. + pub fn new(exprs: impl IntoIterator<Item = Arc<dyn PhysicalExpr>>) -> Self { + let mut class = Self::default(); + for expr in exprs { + class.push(expr); } - } - - /// Return the inner vector of expressions - pub fn into_vec(self) -> Vec<Arc<dyn PhysicalExpr>> { - self.exprs.into_iter().collect() + class } /// Return the "canonical" expression for this class (the first element) - /// if any - fn canonical_expr(&self) -> Option<Arc<dyn PhysicalExpr>> { - self.exprs.iter().next().cloned() + /// if non-empty. + pub fn canonical_expr(&self) -> Option<&Arc<dyn PhysicalExpr>> { + self.exprs.iter().next() } /// Insert the expression into this class, meaning it is known to be equal to - /// all other expressions in this class + /// all other expressions in this class. pub fn push(&mut self, expr: Arc<dyn PhysicalExpr>) { + if let Some(lit) = expr.as_any().downcast_ref::<Literal>() { + let expr_across = AcrossPartitions::Uniform(Some(lit.value().clone())); + if let Some(across) = self.constant.as_mut() { + // TODO: Return an error if constant values do not agree. + if *across == AcrossPartitions::Heterogeneous { + *across = expr_across; + } Review Comment: It is safer by checking the constant literal values are actually the same if the existing `constant` is `Uniform` and its value is known. ########## datafusion/physical-expr/src/equivalence/class.rs: ########## @@ -175,307 +135,398 @@ impl ConstExpr { } } +impl PartialEq for ConstExpr { + fn eq(&self, other: &Self) -> bool { + self.across_partitions == other.across_partitions && self.expr.eq(&other.expr) + } +} + impl Display for ConstExpr { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.expr)?; - match &self.across_partitions { - AcrossPartitions::Heterogeneous => { - write!(f, "(heterogeneous)")?; - } - AcrossPartitions::Uniform(value) => { - if let Some(val) = value { - write!(f, "(uniform: {val})")?; - } else { - write!(f, "(uniform: unknown)")?; - } - } - } - Ok(()) + write!(f, "{}", self.across_partitions) } } impl From<Arc<dyn PhysicalExpr>> for ConstExpr { fn from(expr: Arc<dyn PhysicalExpr>) -> Self { - Self::new(expr) - } -} - -impl From<&Arc<dyn PhysicalExpr>> for ConstExpr { - fn from(expr: &Arc<dyn PhysicalExpr>) -> Self { - Self::new(Arc::clone(expr)) + // By default, assume constant expressions are not same across partitions. + // However, if we have a literal, it will have a single value that is the + // same across all partitions. + let across = if let Some(lit) = expr.as_any().downcast_ref::<Literal>() { + AcrossPartitions::Uniform(Some(lit.value().clone())) + } else { + AcrossPartitions::Heterogeneous + }; + Self { + expr, + across_partitions: across, + } } } -/// Checks whether `expr` is among in the `const_exprs`. -pub fn const_exprs_contains( - const_exprs: &[ConstExpr], - expr: &Arc<dyn PhysicalExpr>, -) -> bool { - const_exprs - .iter() - .any(|const_expr| const_expr.expr.eq(expr)) -} - /// An `EquivalenceClass` is a set of [`Arc<dyn PhysicalExpr>`]s that are known /// to have the same value for all tuples in a relation. These are generated by /// equality predicates (e.g. `a = b`), typically equi-join conditions and /// equality conditions in filters. /// /// Two `EquivalenceClass`es are equal if they contains the same expressions in /// without any ordering. -#[derive(Debug, Clone)] +#[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct EquivalenceClass { - /// The expressions in this equivalence class. The order doesn't - /// matter for equivalence purposes - /// - exprs: IndexSet<Arc<dyn PhysicalExpr>>, -} - -impl PartialEq for EquivalenceClass { - /// Returns true if other is equal in the sense - /// of bags (multi-sets), disregarding their orderings. - fn eq(&self, other: &Self) -> bool { - self.exprs.eq(&other.exprs) - } + /// The expressions in this equivalence class. The order doesn't matter for + /// equivalence purposes. + pub(crate) exprs: IndexSet<Arc<dyn PhysicalExpr>>, + /// Indicates whether the expressions in this equivalence class have a + /// constant value. A `Some` value indicates constant-ness. + pub(crate) constant: Option<AcrossPartitions>, } impl EquivalenceClass { - /// Create a new empty equivalence class - pub fn new_empty() -> Self { - Self { - exprs: IndexSet::new(), - } - } - - // Create a new equivalence class from a pre-existing `Vec` - pub fn new(exprs: Vec<Arc<dyn PhysicalExpr>>) -> Self { - Self { - exprs: exprs.into_iter().collect(), + // Create a new equivalence class from a pre-existing collection. + pub fn new(exprs: impl IntoIterator<Item = Arc<dyn PhysicalExpr>>) -> Self { + let mut class = Self::default(); + for expr in exprs { + class.push(expr); } - } - - /// Return the inner vector of expressions - pub fn into_vec(self) -> Vec<Arc<dyn PhysicalExpr>> { - self.exprs.into_iter().collect() + class } /// Return the "canonical" expression for this class (the first element) - /// if any - fn canonical_expr(&self) -> Option<Arc<dyn PhysicalExpr>> { - self.exprs.iter().next().cloned() + /// if non-empty. + pub fn canonical_expr(&self) -> Option<&Arc<dyn PhysicalExpr>> { + self.exprs.iter().next() } /// Insert the expression into this class, meaning it is known to be equal to - /// all other expressions in this class + /// all other expressions in this class. pub fn push(&mut self, expr: Arc<dyn PhysicalExpr>) { + if let Some(lit) = expr.as_any().downcast_ref::<Literal>() { + let expr_across = AcrossPartitions::Uniform(Some(lit.value().clone())); + if let Some(across) = self.constant.as_mut() { + // TODO: Return an error if constant values do not agree. + if *across == AcrossPartitions::Heterogeneous { Review Comment: Why a new `Uniform` constant overwrites existing `Heterogeneous`? Shouldn't it be an error? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org