This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 0f093f405c Implement cardinality_effect for window execs and UnionExec
(#20321)
0f093f405c is described below
commit 0f093f405c399df8673298a4ecc6814f0823ea9e
Author: Namgung Chan <[email protected]>
AuthorDate: Thu Mar 5 07:28:55 2026 +0900
Implement cardinality_effect for window execs and UnionExec (#20321)
## Which issue does this PR close?
- Closes #20291.
## Rationale for this change
`WindowAggExec` and `BoundedWindowAggExec` did not implement
`cardinality_effect`, which left this property as `Unknown`.
Both operators preserve row cardinality:
- They evaluate window expressions per input row and append result
columns.
- They do not filter out rows.
- They do not duplicate rows.
So their cardinality effect is `Equal`.
This PR also updates `UnionExec`, which combines rows from multiple
children. Its cardinality effect should be `GreaterEqual` instead of
defaulting to `Unknown`.
## What changes are included in this PR?
- Implement `cardinality_effect` for `WindowAggExec` as
`CardinalityEffect::Equal`.
- Implement `cardinality_effect` for `BoundedWindowAggExec` as
`CardinalityEffect::Equal`.
- Implement `cardinality_effect` for `UnionExec` as
`CardinalityEffect::GreaterEqual`.
## Are these changes tested?
Unit tested.
## Are there any user-facing changes?
No.
## Additional note
I used a coding agent for implementation/PR drafting and reviewed the
changes myself. If this conflicts with project policy, please let me
know.
---
datafusion/physical-plan/src/union.rs | 31 +++++++++++++-
.../src/windows/bounded_window_agg_exec.rs | 24 +++++++++++
.../physical-plan/src/windows/window_agg_exec.rs | 50 +++++++++++++++++++++-
3 files changed, 102 insertions(+), 3 deletions(-)
diff --git a/datafusion/physical-plan/src/union.rs
b/datafusion/physical-plan/src/union.rs
index 5c4f821c98..168048295d 100644
--- a/datafusion/physical-plan/src/union.rs
+++ b/datafusion/physical-plan/src/union.rs
@@ -34,8 +34,8 @@ use super::{
};
use crate::check_if_same_properties;
use crate::execution_plan::{
- InvariantLevel, boundedness_from_children, check_default_invariants,
- emission_type_from_children,
+ CardinalityEffect, InvariantLevel, boundedness_from_children,
+ check_default_invariants, emission_type_from_children,
};
use crate::filter::FilterExec;
use crate::filter_pushdown::{
@@ -360,6 +360,12 @@ impl ExecutionPlan for UnionExec {
}
}
+ fn cardinality_effect(&self) -> CardinalityEffect {
+ // Union combines rows from multiple inputs, so output rows are not
tied
+ // to any single input and can only be constrained as greater-or-equal.
+ CardinalityEffect::GreaterEqual
+ }
+
fn supports_limit_pushdown(&self) -> bool {
true
}
@@ -1210,4 +1216,25 @@ mod tests {
)
);
}
+
+ #[test]
+ fn test_union_cardinality_effect() -> Result<()> {
+ let schema = create_test_schema()?;
+ let input1: Arc<dyn ExecutionPlan> =
+ Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?);
+ let input2: Arc<dyn ExecutionPlan> =
+ Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?);
+
+ let union = UnionExec::try_new(vec![input1, input2])?;
+ let union = union
+ .as_any()
+ .downcast_ref::<UnionExec>()
+ .expect("expected UnionExec for multiple inputs");
+
+ assert!(matches!(
+ union.cardinality_effect(),
+ CardinalityEffect::GreaterEqual
+ ));
+ Ok(())
+ }
}
diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
index 0a3d592776..f589b4d748 100644
--- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
@@ -66,6 +66,7 @@ use datafusion_physical_expr_common::sort_expr::{
OrderingRequirements, PhysicalSortExpr,
};
+use crate::execution_plan::CardinalityEffect;
use ahash::RandomState;
use futures::stream::Stream;
use futures::{StreamExt, ready};
@@ -398,6 +399,10 @@ impl ExecutionPlan for BoundedWindowAggExec {
let input_stat = self.input.partition_statistics(partition)?;
self.statistics_helper(input_stat)
}
+
+ fn cardinality_effect(&self) -> CardinalityEffect {
+ CardinalityEffect::Equal
+ }
}
/// Trait that specifies how we search for (or calculate) partitions. It has
two
@@ -1266,6 +1271,7 @@ mod tests {
use std::time::Duration;
use crate::common::collect;
+ use crate::execution_plan::CardinalityEffect;
use crate::expressions::PhysicalSortExpr;
use crate::projection::{ProjectionExec, ProjectionExpr};
use crate::streaming::{PartitionStream, StreamingTableExec};
@@ -1850,4 +1856,22 @@ mod tests {
Ok(())
}
+
+ #[test]
+ fn test_bounded_window_agg_cardinality_effect() -> Result<()> {
+ let schema = test_schema();
+ let input: Arc<dyn ExecutionPlan> =
+ Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?);
+ let plan = bounded_window_exec_pb_latent_range(input, 1, "hash",
"sn")?;
+ let plan = plan
+ .as_any()
+ .downcast_ref::<BoundedWindowAggExec>()
+ .expect("expected BoundedWindowAggExec");
+
+ assert!(matches!(
+ plan.cardinality_effect(),
+ CardinalityEffect::Equal
+ ));
+ Ok(())
+ }
}
diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs
b/datafusion/physical-plan/src/windows/window_agg_exec.rs
index 01710d5b3c..f71f1cbfe6 100644
--- a/datafusion/physical-plan/src/windows/window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs
@@ -23,7 +23,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use super::utils::create_schema;
-use crate::execution_plan::EmissionType;
+use crate::execution_plan::{CardinalityEffect, EmissionType};
use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use crate::windows::{
calc_requirements, get_ordered_partition_by_indices,
get_partition_by_sort_exprs,
@@ -315,6 +315,10 @@ impl ExecutionPlan for WindowAggExec {
total_byte_size: Precision::Absent,
})
}
+
+ fn cardinality_effect(&self) -> CardinalityEffect {
+ CardinalityEffect::Equal
+ }
}
/// Compute the window aggregate columns
@@ -464,3 +468,47 @@ impl RecordBatchStream for WindowAggStream {
Arc::clone(&self.schema)
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::test::TestMemoryExec;
+ use crate::windows::create_window_expr;
+ use arrow::datatypes::{DataType, Field, Schema};
+ use datafusion_common::ScalarValue;
+ use datafusion_expr::{
+ WindowFrame, WindowFrameBound, WindowFrameUnits,
WindowFunctionDefinition,
+ };
+ use datafusion_functions_aggregate::count::count_udaf;
+
+ #[test]
+ fn test_window_agg_cardinality_effect() -> Result<()> {
+ let schema = Arc::new(Schema::new(vec![Field::new("a",
DataType::Int64, true)]));
+ let input: Arc<dyn ExecutionPlan> =
+ Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?);
+ let args = vec![crate::expressions::col("a", &schema)?];
+ let window_expr = create_window_expr(
+ &WindowFunctionDefinition::AggregateUDF(count_udaf()),
+ "count(a)".to_string(),
+ &args,
+ &[],
+ &[],
+ Arc::new(WindowFrame::new_bounds(
+ WindowFrameUnits::Rows,
+ WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
+ WindowFrameBound::CurrentRow,
+ )),
+ Arc::clone(&schema),
+ false,
+ false,
+ None,
+ )?;
+
+ let window = WindowAggExec::try_new(vec![window_expr], input, true)?;
+ assert!(matches!(
+ window.cardinality_effect(),
+ CardinalityEffect::Equal
+ ));
+ Ok(())
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]