This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new c919054241 perf: short-circuit and collect_bool for IN list with
column references (#20694)
c919054241 is described below
commit c9190542413c0642cca47129b4830b074924bdd3
Author: Zhang Xiaofeng <[email protected]>
AuthorDate: Thu Mar 5 23:13:58 2026 +0800
perf: short-circuit and collect_bool for IN list with column references
(#20694)
## Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->
- Closes #20428 .
## Rationale for this change
Third PR in the IN list optimization series (split from #20428):
- PR1: benchmarks (#20444, merged)
- PR2: Arrow vectorized eq kernel (#20528, merged)
- **PR3 (this): short-circuit, collect_bool, and first-expr
initialization**
<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
## What changes are included in this PR?
- **Short-circuit break**: convert `try_fold` to `for` loop; when all
non-null rows are already `true`, skip remaining list items (up to 27x
faster for match=100%/nulls=0%)
- **`BooleanBuffer::collect_bool`**: use in `make_comparator` fallback
path for nested types instead `(0..n).map().collect()` (suggested by
@Dandandan in #20428 )
- **First-expr initialization**: evaluate the first list expression
directly as the accumulator, avoiding a redundant `or_kleene(all_false,
rhs)` (suggested by @Dandandan in #20428 )
- **Tests**: added 3 new tests covering short-circuit, short-circuit
with nulls, and struct column references (make_comparator fallback path)
<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->
## Are these changes tested?
Yes, and add test to cover short-circuit, short-circuit with nulls, and
struct column references (make_comparator fallback path)
Benchmark result:
```
(zhangxffff) zhangxffff@95d3d60664da ~/W/datafusion ((bcc52cd4))> critcmp
after before
group after
before
----- -----
------
in_list_cols/Int32/list=28/match=0%/nulls=0% 1.02 93.8±1.80µs
? ?/sec 1.00 91.8±1.52µs ? ?/sec
in_list_cols/Int32/list=28/match=0%/nulls=20% 1.03 105.3±1.95µs
? ?/sec 1.00 102.2±1.59µs ? ?/sec
in_list_cols/Int32/list=28/match=100%/nulls=0% 1.00 3.4±0.07µs
? ?/sec 27.14 91.7±1.52µs ? ?/sec
in_list_cols/Int32/list=28/match=100%/nulls=20% 1.07 107.7±1.91µs
? ?/sec 1.00 100.4±1.33µs ? ?/sec
in_list_cols/Int32/list=28/match=50%/nulls=0% 1.00 50.1±1.15µs
? ?/sec 1.84 92.4±1.36µs ? ?/sec
in_list_cols/Int32/list=28/match=50%/nulls=20% 1.05 105.1±1.49µs
? ?/sec 1.00 100.0±0.84µs ? ?/sec
in_list_cols/Int32/list=3/match=0%/nulls=0% 1.00 9.9±0.17µs
? ?/sec 1.01 10.1±0.19µs ? ?/sec
in_list_cols/Int32/list=3/match=0%/nulls=20% 1.02 11.0±0.18µs
? ?/sec 1.00 10.8±0.16µs ? ?/sec
in_list_cols/Int32/list=3/match=100%/nulls=0% 1.00 3.3±0.06µs
? ?/sec 2.95 9.9±0.16µs ? ?/sec
in_list_cols/Int32/list=3/match=100%/nulls=20% 1.01 10.9±0.19µs
? ?/sec 1.00 10.8±0.09µs ? ?/sec
in_list_cols/Int32/list=3/match=50%/nulls=0% 1.00 10.0±0.17µs
? ?/sec 1.00 9.9±0.18µs ? ?/sec
in_list_cols/Int32/list=3/match=50%/nulls=20% 1.05 11.3±0.24µs
? ?/sec 1.00 10.8±0.11µs ? ?/sec
in_list_cols/Int32/list=8/match=0%/nulls=0% 1.02 26.7±0.58µs
? ?/sec 1.00 26.2±0.50µs ? ?/sec
in_list_cols/Int32/list=8/match=0%/nulls=20% 1.04 29.6±0.57µs
? ?/sec 1.00 28.5±0.45µs ? ?/sec
in_list_cols/Int32/list=8/match=100%/nulls=0% 1.00 3.4±0.05µs
? ?/sec 7.78 26.2±0.36µs ? ?/sec
in_list_cols/Int32/list=8/match=100%/nulls=20% 1.05 30.0±0.65µs
? ?/sec 1.00 28.7±0.55µs ? ?/sec
in_list_cols/Int32/list=8/match=50%/nulls=0% 1.03 26.7±0.59µs
? ?/sec 1.00 26.0±0.37µs ? ?/sec
in_list_cols/Int32/list=8/match=50%/nulls=20% 1.04 29.9±0.57µs
? ?/sec 1.00 28.7±0.46µs ? ?/sec
in_list_cols/Utf8/list=28/match=0% 1.17 155.0±2.44µs
? ?/sec 1.00 132.8±2.97µs ? ?/sec
in_list_cols/Utf8/list=28/match=100% 1.02 726.6±14.54µs
? ?/sec 1.00 712.4±9.09µs ? ?/sec
in_list_cols/Utf8/list=28/match=50% 1.02 1070.1±13.06µs
? ?/sec 1.00 1051.8±8.17µs ? ?/sec
in_list_cols/Utf8/list=3/match=0% 1.14 16.4±0.37µs
? ?/sec 1.00 14.4±0.22µs ? ?/sec
in_list_cols/Utf8/list=3/match=100% 1.02 68.0±1.29µs
? ?/sec 1.00 66.5±0.99µs ? ?/sec
in_list_cols/Utf8/list=3/match=50% 1.15 107.6±2.05µs
? ?/sec 1.00 93.6±1.88µs ? ?/sec
in_list_cols/Utf8/list=8/match=0% 1.16 44.0±0.61µs
? ?/sec 1.00 37.9±0.95µs ? ?/sec
in_list_cols/Utf8/list=8/match=100% 1.00 190.4±2.71µs
? ?/sec 1.03 195.7±2.01µs ? ?/sec
in_list_cols/Utf8/list=8/match=50% 1.03 295.9±4.45µs
? ?/sec 1.00 287.3±3.26µs ? ?/sec
```
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
## Are there any user-facing changes?
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->
<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
---------
Co-authored-by: Adrian Garcia Badaracco
<[email protected]>
---
.../physical-expr/src/expressions/in_list.rs | 264 ++++++++++++++++-----
1 file changed, 209 insertions(+), 55 deletions(-)
diff --git a/datafusion/physical-expr/src/expressions/in_list.rs
b/datafusion/physical-expr/src/expressions/in_list.rs
index 44a6572f53..e30f256352 100644
--- a/datafusion/physical-expr/src/expressions/in_list.rs
+++ b/datafusion/physical-expr/src/expressions/in_list.rs
@@ -793,63 +793,71 @@ impl PhysicalExpr for InListExpr {
// comparator for unsupported types (nested, RunEndEncoded,
etc.).
let value = value.into_array(num_rows)?;
let lhs_supports_arrow_eq =
supports_arrow_eq(value.data_type());
- let found = self.list.iter().map(|expr|
expr.evaluate(batch)).try_fold(
- BooleanArray::new(BooleanBuffer::new_unset(num_rows),
None),
- |result, expr| -> Result<BooleanArray> {
- let rhs = match expr? {
- ColumnarValue::Array(array) => {
- if lhs_supports_arrow_eq
- && supports_arrow_eq(array.data_type())
- {
- arrow_eq(&value, &array)?
- } else {
- let cmp = make_comparator(
- value.as_ref(),
- array.as_ref(),
- SortOptions::default(),
- )?;
- (0..num_rows)
- .map(|i| {
- if value.is_null(i) ||
array.is_null(i) {
- return None;
- }
- Some(cmp(i, i).is_eq())
- })
- .collect::<BooleanArray>()
- }
+
+ // Helper: compare value against a single list expression
+ let compare_one = |expr: &Arc<dyn PhysicalExpr>| ->
Result<BooleanArray> {
+ match expr.evaluate(batch)? {
+ ColumnarValue::Array(array) => {
+ if lhs_supports_arrow_eq
+ && supports_arrow_eq(array.data_type())
+ {
+ Ok(arrow_eq(&value, &array)?)
+ } else {
+ let cmp = make_comparator(
+ value.as_ref(),
+ array.as_ref(),
+ SortOptions::default(),
+ )?;
+ let buffer =
BooleanBuffer::collect_bool(num_rows, |i| {
+ cmp(i, i).is_eq()
+ });
+ let nulls =
+ NullBuffer::union(value.nulls(),
array.nulls());
+ Ok(BooleanArray::new(buffer, nulls))
}
- ColumnarValue::Scalar(scalar) => {
- // Check if scalar is null once, before the
loop
- if scalar.is_null() {
- // If scalar is null, all comparisons
return null
- BooleanArray::from(vec![None; num_rows])
- } else if lhs_supports_arrow_eq {
- let scalar_datum = scalar.to_scalar()?;
- arrow_eq(&value, &scalar_datum)?
- } else {
- // Convert scalar to 1-element array
- let array = scalar.to_array()?;
- let cmp = make_comparator(
- value.as_ref(),
- array.as_ref(),
- SortOptions::default(),
- )?;
- // Compare each row of value with the
single scalar element
- (0..num_rows)
- .map(|i| {
- if value.is_null(i) {
- None
- } else {
- Some(cmp(i, 0).is_eq())
- }
- })
- .collect::<BooleanArray>()
- }
+ }
+ ColumnarValue::Scalar(scalar) => {
+ // Check if scalar is null once, before the loop
+ if scalar.is_null() {
+ // If scalar is null, all comparisons return
null
+ Ok(BooleanArray::from(vec![None; num_rows]))
+ } else if lhs_supports_arrow_eq {
+ let scalar_datum = scalar.to_scalar()?;
+ Ok(arrow_eq(&value, &scalar_datum)?)
+ } else {
+ // Convert scalar to 1-element array
+ let array = scalar.to_array()?;
+ let cmp = make_comparator(
+ value.as_ref(),
+ array.as_ref(),
+ SortOptions::default(),
+ )?;
+ // Compare each row of value with the single
scalar element
+ let buffer =
BooleanBuffer::collect_bool(num_rows, |i| {
+ cmp(i, 0).is_eq()
+ });
+ Ok(BooleanArray::new(buffer,
value.nulls().cloned()))
}
- };
- Ok(or_kleene(&result, &rhs)?)
- },
- )?;
+ }
+ }
+ };
+
+ // Evaluate first expression directly to avoid a redundant
+ // or_kleene with an all-false accumulator.
+ let mut found = if let Some(first) = self.list.first() {
+ compare_one(first)?
+ } else {
+ BooleanArray::new(BooleanBuffer::new_unset(num_rows), None)
+ };
+
+ for expr in self.list.iter().skip(1) {
+ // Short-circuit: if every non-null row is already true,
+ // no further list items can change the result.
+ if found.null_count() == 0 && found.true_count() ==
num_rows {
+ break;
+ }
+ found = or_kleene(&found, &compare_one(expr)?)?;
+ }
if self.negated { not(&found)? } else { found }
}
@@ -3724,4 +3732,150 @@ mod tests {
assert_eq!(result, &BooleanArray::from(vec![true, false, false]));
Ok(())
}
+
+ /// Tests that short-circuit evaluation produces correct results.
+ /// When all rows match after the first list item, remaining items
+ /// should be skipped without affecting correctness.
+ #[test]
+ fn test_in_list_with_columns_short_circuit() -> Result<()> {
+ // a IN (b, c) where b already matches every row of a
+ // The short-circuit should skip evaluating c
+ let schema = Schema::new(vec![
+ Field::new("a", DataType::Int32, false),
+ Field::new("b", DataType::Int32, false),
+ Field::new("c", DataType::Int32, false),
+ ]);
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![
+ Arc::new(Int32Array::from(vec![1, 2, 3])),
+ Arc::new(Int32Array::from(vec![1, 2, 3])), // b == a for all
rows
+ Arc::new(Int32Array::from(vec![99, 99, 99])),
+ ],
+ )?;
+
+ let col_a = col("a", &schema)?;
+ let list = vec![col("b", &schema)?, col("c", &schema)?];
+ let expr = make_in_list_with_columns(col_a, list, false);
+
+ let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+ let result = as_boolean_array(&result);
+ assert_eq!(result, &BooleanArray::from(vec![true, true, true]));
+ Ok(())
+ }
+
+ /// Short-circuit must NOT skip when nulls are present (three-valued
logic).
+ /// Even if all non-null values are true, null rows keep the result as
null.
+ #[test]
+ fn test_in_list_with_columns_short_circuit_with_nulls() -> Result<()> {
+ // a IN (b, c) where a has nulls
+ // Even if b matches all non-null rows, result should preserve nulls
+ let schema = Schema::new(vec![
+ Field::new("a", DataType::Int32, true),
+ Field::new("b", DataType::Int32, false),
+ Field::new("c", DataType::Int32, false),
+ ]);
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![
+ Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])),
+ Arc::new(Int32Array::from(vec![1, 2, 3])), // matches non-null
rows
+ Arc::new(Int32Array::from(vec![99, 99, 99])),
+ ],
+ )?;
+
+ let col_a = col("a", &schema)?;
+ let list = vec![col("b", &schema)?, col("c", &schema)?];
+ let expr = make_in_list_with_columns(col_a, list, false);
+
+ let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+ let result = as_boolean_array(&result);
+ // row 0: 1 IN (1, 99) → true
+ // row 1: NULL IN (2, 99) → NULL
+ // row 2: 3 IN (3, 99) → true
+ assert_eq!(
+ result,
+ &BooleanArray::from(vec![Some(true), None, Some(true)])
+ );
+ Ok(())
+ }
+
+ /// Tests the make_comparator + collect_bool fallback path using
+ /// struct column references (nested types don't support arrow_eq).
+ #[test]
+ fn test_in_list_with_columns_struct() -> Result<()> {
+ let struct_fields = Fields::from(vec![
+ Field::new("x", DataType::Int32, false),
+ Field::new("y", DataType::Utf8, false),
+ ]);
+ let struct_dt = DataType::Struct(struct_fields.clone());
+
+ let schema = Schema::new(vec![
+ Field::new("a", struct_dt.clone(), true),
+ Field::new("b", struct_dt.clone(), false),
+ Field::new("c", struct_dt.clone(), false),
+ ]);
+
+ // a: [{1,"a"}, {2,"b"}, NULL, {4,"d"}]
+ // b: [{1,"a"}, {9,"z"}, {3,"c"}, {4,"d"}]
+ // c: [{9,"z"}, {2,"b"}, {9,"z"}, {9,"z"}]
+ let a = Arc::new(StructArray::new(
+ struct_fields.clone(),
+ vec![
+ Arc::new(Int32Array::from(vec![1, 2, 3, 4])),
+ Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
+ ],
+ Some(vec![true, true, false, true].into()),
+ ));
+ let b = Arc::new(StructArray::new(
+ struct_fields.clone(),
+ vec![
+ Arc::new(Int32Array::from(vec![1, 9, 3, 4])),
+ Arc::new(StringArray::from(vec!["a", "z", "c", "d"])),
+ ],
+ None,
+ ));
+ let c = Arc::new(StructArray::new(
+ struct_fields.clone(),
+ vec![
+ Arc::new(Int32Array::from(vec![9, 2, 9, 9])),
+ Arc::new(StringArray::from(vec!["z", "b", "z", "z"])),
+ ],
+ None,
+ ));
+
+ let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a, b,
c])?;
+
+ let col_a = col("a", &schema)?;
+ let list = vec![col("b", &schema)?, col("c", &schema)?];
+ let expr = make_in_list_with_columns(col_a, list, false);
+
+ let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+ let result = as_boolean_array(&result);
+ // row 0: {1,"a"} IN ({1,"a"}, {9,"z"}) → true (matches b)
+ // row 1: {2,"b"} IN ({9,"z"}, {2,"b"}) → true (matches c)
+ // row 2: NULL IN ({3,"c"}, {9,"z"}) → NULL
+ // row 3: {4,"d"} IN ({4,"d"}, {9,"z"}) → true (matches b)
+ assert_eq!(
+ result,
+ &BooleanArray::from(vec![Some(true), Some(true), None, Some(true)])
+ );
+
+ // Also test NOT IN
+ let col_a = col("a", &schema)?;
+ let list = vec![col("b", &schema)?, col("c", &schema)?];
+ let expr = make_in_list_with_columns(col_a, list, true);
+
+ let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+ let result = as_boolean_array(&result);
+ // row 0: {1,"a"} NOT IN ({1,"a"}, {9,"z"}) → false
+ // row 1: {2,"b"} NOT IN ({9,"z"}, {2,"b"}) → false
+ // row 2: NULL NOT IN ({3,"c"}, {9,"z"}) → NULL
+ // row 3: {4,"d"} NOT IN ({4,"d"}, {9,"z"}) → false
+ assert_eq!(
+ result,
+ &BooleanArray::from(vec![Some(false), Some(false), None,
Some(false)])
+ );
+ Ok(())
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]