alamb commented on code in PR #22002:
URL: https://github.com/apache/datafusion/pull/22002#discussion_r3183693755
##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -133,13 +140,264 @@ impl Display for Partitioning {
.join(", ");
write!(f, "Hash([{phy_exprs_str}], {size})")
}
+ Partitioning::Range(range) => write!(f, "{range}"),
Partitioning::UnknownPartitioning(size) => {
write!(f, "UnknownPartitioning({size})")
}
}
}
}
+/// Ordered range partitioning for one or more expressions.
+///
+/// Each [`PartitionRange`] describes the value range for one output partition.
+/// Ranges are interpreted lexicographically across [`Self::exprs`]. This type
+/// records the partitioning contract; callers are responsible for constructing
+/// non-overlapping ranges that accurately describe the source.
+#[derive(Debug, Clone)]
+pub struct RangePartitioning {
+ exprs: Vec<Arc<dyn PhysicalExpr>>,
Review Comment:
In generla I think is is important to define formally what range
partitioning means -- specifically given a particular row what partition is it
in
As you have this structured I think you could have more than one range
expressions be true. For example
```
{
exprs: [a, b]
ranges: [[100-200], [100-200]]
}
```
For a row like (a,b) = (10,20) it would be in both ranges
I think it is more common (e.g. spark) to have something lke
1. A single `Expr`
2. A list of ranges
3. Explicitly declare that the FIRST range that matches is the partition
that the row is placed in
##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -133,13 +140,264 @@ impl Display for Partitioning {
.join(", ");
write!(f, "Hash([{phy_exprs_str}], {size})")
}
+ Partitioning::Range(range) => write!(f, "{range}"),
Partitioning::UnknownPartitioning(size) => {
write!(f, "UnknownPartitioning({size})")
}
}
}
}
+/// Ordered range partitioning for one or more expressions.
+///
+/// Each [`PartitionRange`] describes the value range for one output partition.
+/// Ranges are interpreted lexicographically across [`Self::exprs`]. This type
+/// records the partitioning contract; callers are responsible for constructing
+/// non-overlapping ranges that accurately describe the source.
+#[derive(Debug, Clone)]
+pub struct RangePartitioning {
+ exprs: Vec<Arc<dyn PhysicalExpr>>,
Review Comment:
Also, does it make sense to explicitly define that the ranges can't overlap
and must cover the whole range?
##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -133,13 +140,264 @@ impl Display for Partitioning {
.join(", ");
write!(f, "Hash([{phy_exprs_str}], {size})")
}
+ Partitioning::Range(range) => write!(f, "{range}"),
Partitioning::UnknownPartitioning(size) => {
write!(f, "UnknownPartitioning({size})")
}
}
}
}
+/// Ordered range partitioning for one or more expressions.
+///
+/// Each [`PartitionRange`] describes the value range for one output partition.
+/// Ranges are interpreted lexicographically across [`Self::exprs`]. This type
+/// records the partitioning contract; callers are responsible for constructing
+/// non-overlapping ranges that accurately describe the source.
+#[derive(Debug, Clone)]
+pub struct RangePartitioning {
+ exprs: Vec<Arc<dyn PhysicalExpr>>,
+ ranges: Vec<PartitionRange>,
+}
+
+impl RangePartitioning {
+ /// Create a new [`RangePartitioning`].
+ ///
+ /// Each bound must have the same arity as `exprs`.
+ pub fn try_new(
+ exprs: Vec<Arc<dyn PhysicalExpr>>,
+ ranges: Vec<PartitionRange>,
+ ) -> Result<Self> {
+ if exprs.is_empty() {
+ return plan_err!("RangePartitioning requires at least one
expression");
+ }
+ if ranges.is_empty() {
+ return plan_err!("RangePartitioning requires at least one range");
+ }
+
+ for range in &ranges {
+ range.validate(exprs.len())?;
+ }
+
+ Ok(Self { exprs, ranges })
+ }
+
+ /// Expressions whose values determine the partition range.
+ pub fn exprs(&self) -> &[Arc<dyn PhysicalExpr>] {
+ &self.exprs
+ }
+
+ /// Per-partition ranges, in partition index order.
+ pub fn ranges(&self) -> &[PartitionRange] {
+ &self.ranges
+ }
+
+ /// Number of range partitions.
+ pub fn partition_count(&self) -> usize {
+ self.ranges.len()
+ }
+
+ /// Returns how this range partitioning satisfies a hash distribution
+ /// requirement.
+ ///
+ /// A range partitioning satisfies the requirement when all equal values
for
+ /// the required expressions are colocated in one partition. The routing is
+ /// range-based rather than hash-based, but the distribution property is
the
+ /// same property hash joins and grouped aggregations require.
+ pub fn satisfaction(
+ &self,
+ required_exprs: &[Arc<dyn PhysicalExpr>],
+ eq_properties: &EquivalenceProperties,
+ allow_subset: bool,
+ ) -> PartitioningSatisfaction {
+ exprs_satisfy_distribution(
+ &self.exprs,
+ required_exprs,
+ eq_properties,
+ allow_subset,
+ )
+ }
+
+ /// Returns whether this range partitioning has the same partition map as
+ /// another range partitioning.
+ pub fn compatibility(&self, other: &Self) -> PartitioningCompatibility {
+ if !physical_exprs_equal(&self.exprs, &other.exprs) {
+ return PartitioningCompatibility::Incompatible;
+ }
+
+ if self.ranges == other.ranges {
+ PartitioningCompatibility::SamePartitionMap
+ } else {
+ PartitioningCompatibility::SameExpressionsDifferentBounds
+ }
+ }
+
+ fn project(
+ &self,
+ mapping: &ProjectionMapping,
+ input_eq_properties: &EquivalenceProperties,
+ ) -> Self {
+ let exprs = project_partition_exprs(&self.exprs, mapping,
input_eq_properties);
+ Self {
+ exprs,
+ ranges: self.ranges.clone(),
+ }
+ }
+}
+
+impl PartialEq for RangePartitioning {
+ fn eq(&self, other: &Self) -> bool {
+ physical_exprs_equal(&self.exprs, &other.exprs) && self.ranges ==
other.ranges
+ }
+}
+
+impl Display for RangePartitioning {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ let exprs = self
+ .exprs
+ .iter()
+ .map(|e| format!("{e}"))
+ .collect::<Vec<_>>()
+ .join(", ");
+ let ranges = self
+ .ranges
+ .iter()
+ .map(|r| format!("{r}"))
+ .collect::<Vec<_>>()
+ .join(", ");
+ write!(f, "Range([{exprs}], [{ranges}])")
+ }
+}
+
+/// A single partition's lexicographic value range.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct PartitionRange {
+ lower: Option<RangeBound>,
+ upper: Option<RangeBound>,
+}
+
+impl PartitionRange {
+ /// Create a range with optional lower and upper bounds.
+ pub fn new(lower: Option<RangeBound>, upper: Option<RangeBound>) -> Self {
+ Self { lower, upper }
+ }
+
+ /// Create an unbounded range.
+ pub fn unbounded() -> Self {
+ Self {
+ lower: None,
+ upper: None,
+ }
+ }
+
+ /// Lower bound, if any.
+ pub fn lower(&self) -> Option<&RangeBound> {
+ self.lower.as_ref()
+ }
+
+ /// Upper bound, if any.
+ pub fn upper(&self) -> Option<&RangeBound> {
+ self.upper.as_ref()
+ }
+
+ fn validate(&self, arity: usize) -> Result<()> {
+ if let Some(lower) = &self.lower {
+ lower.validate(arity)?;
+ }
+ if let Some(upper) = &self.upper {
+ upper.validate(arity)?;
+ }
+ Ok(())
+ }
+}
+
+impl Display for PartitionRange {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ match (&self.lower, &self.upper) {
+ (Some(lower), Some(upper)) => write!(f, "{lower}..{upper}"),
+ (Some(lower), None) => write!(f, "{lower}.."),
+ (None, Some(upper)) => write!(f, "..{upper}"),
+ (None, None) => write!(f, ".."),
+ }
+ }
+}
+
+/// A lexicographic range bound.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct RangeBound {
Review Comment:
This looks pretty similar to `Interval`:
https://docs.rs/datafusion/latest/datafusion/logical_expr/interval_arithmetic/struct.Interval.html
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]