gene-bordegaray commented on code in PR #22207:
URL: https://github.com/apache/datafusion/pull/22207#discussion_r3261586069
##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -133,13 +136,225 @@ impl Display for Partitioning {
.join(", ");
write!(f, "Hash([{phy_exprs_str}], {size})")
}
+ Partitioning::Range(range) => write!(f, "{range}"),
Partitioning::UnknownPartitioning(size) => {
write!(f, "UnknownPartitioning({size})")
}
}
}
}
+/// Physical range partitioning.
+///
+/// [`RangePartitioning`] describes range bounds over one or more physical
+/// expressions. Each [`RangePartition`] represents one output partition and
must
+/// contain exactly one [`RangeInterval`] for each partition expression.
+///
+/// The source declaring this partitioning is responsible for ensuring that,
for
+/// every emitted row, the row belongs to exactly one partition and is emitted
by
+/// that partition. The declared ranges do not need to cover values that the
plan
+/// cannot emit.
+///
+/// Each lower and upper bound explicitly records whether it is inclusive.
+/// Unbounded sides are represented with `None`, bound values should be
non-null
+/// until null routing semantics are defined.
+///
+/// For example, a scan can declare date and city range partitions as:
+///
+/// ```text
+/// exprs = [date, city]
+///
+/// partition 0:
+/// date in [2021-01-01, 2022-01-01)
+/// city in [Allston, Boston)
+///
+/// partition 1:
+/// date in [2021-01-01, 2022-01-01)
+/// city in [Boston, NYC)
+/// ```
+///
+/// NOTE: Optimizer and execution behavior for this partitioning is
intentionally
+/// not implemented and will be introduced incrementally. This public API keeps
+/// the partition ranges explicit for users. Repartitioning may compile the
same
+/// metadata into a more efficient internal router.
+#[derive(Debug, Clone)]
+pub struct RangePartitioning {
+ partition_exprs: Vec<Arc<dyn PhysicalExpr>>,
+ partitions: Vec<RangePartition>,
+}
+
+impl RangePartitioning {
+ /// Creates range partitioning metadata.
+ ///
+ /// The caller is responsible for ensuring each partition has one range per
+ /// partition expression and for satisfying the contract documented on
+ /// [`RangePartitioning`].
+ pub fn new(
+ partition_exprs: Vec<Arc<dyn PhysicalExpr>>,
+ partitions: Vec<RangePartition>,
+ ) -> Self {
+ Self {
+ partition_exprs,
+ partitions,
+ }
+ }
+
+ /// Returns the partition expressions.
+ pub fn partition_exprs(&self) -> &[Arc<dyn PhysicalExpr>] {
+ &self.partition_exprs
+ }
+
+ /// Returns the declared range partitions.
+ pub fn partitions(&self) -> &[RangePartition] {
+ &self.partitions
+ }
+
+ /// Returns the number of partitions.
+ pub fn partition_count(&self) -> usize {
+ self.partitions.len()
+ }
+
+ fn project(
+ &self,
+ mapping: &ProjectionMapping,
+ input_eq_properties: &EquivalenceProperties,
+ ) -> Option<Self> {
+ let partition_exprs = input_eq_properties
+ .project_expressions(&self.partition_exprs, mapping)
+ .collect::<Option<Vec<_>>>()?;
+
+ Some(Self {
+ partition_exprs,
+ partitions: self.partitions.clone(),
+ })
+ }
+}
+
+impl Display for RangePartitioning {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ let partitions = self
+ .partitions
+ .iter()
+ .map(|partition| format!("{partition}"))
+ .collect::<Vec<_>>()
+ .join(", ");
+ write!(
+ f,
+ "Range({}, [{}], {})",
+ format_physical_expr_list(&self.partition_exprs),
+ partitions,
+ self.partition_count()
+ )
+ }
+}
+
+impl PartialEq for RangePartitioning {
+ fn eq(&self, other: &Self) -> bool {
+ physical_exprs_equal(&self.partition_exprs, &other.partition_exprs)
+ && self.partitions == other.partitions
+ }
+}
+
+/// Ranges for one output partition in a [`RangePartitioning`].
+#[derive(Debug, Clone, PartialEq)]
+pub struct RangePartition {
+ ranges: Vec<RangeInterval>,
+}
+
+impl RangePartition {
+ /// Creates a partition from one range per partition expression.
+ pub fn new(ranges: Vec<RangeInterval>) -> Self {
+ Self { ranges }
+ }
+
+ /// Returns the ranges for this partition.
+ pub fn ranges(&self) -> &[RangeInterval] {
+ &self.ranges
+ }
+}
+
+impl Display for RangePartition {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ let ranges = self
+ .ranges
+ .iter()
+ .map(|range| format!("{range}"))
+ .collect::<Vec<_>>()
+ .join(", ");
+ write!(f, "({ranges})")
+ }
+}
+
+/// A scalar interval in one range partition dimension.
+#[derive(Debug, Clone, PartialEq)]
+pub struct RangeInterval {
+ lower: Option<RangeBound>,
+ upper: Option<RangeBound>,
Review Comment:
The reason I leaned toward this was readability. I think we could make the
documentation clear or even provide helpers to abstract this nicely so I am not
concerned with this.
I am ok with dong split points as well as long as other maintainers think
this is ok for public API 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]