alamb commented on code in PR #11652:
URL: https://github.com/apache/datafusion/pull/11652#discussion_r1693622103
##########
datafusion/physical-plan/src/lib.rs:
##########
@@ -428,6 +428,18 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
fn statistics(&self) -> Result<Statistics> {
Ok(Statistics::new_unknown(&self.schema()))
}
+
+ /// Returns `true` if a limit can be safely pushed down through this
+ /// `ExecutionPlan` node.
+ fn supports_limit_pushdown(&self) -> bool {
Review Comment:
I found this name somewhat confusing as it implied to me it was reporting if
`with_fetch` was implemented, where now I see it refers to if it is ok to push
a limit through the node
Perhaps we could rename it to something different. Perhaps `can_push_limit`
or `preserves_limit` ?
I don't feel strongly about this
##########
datafusion/sqllogictest/test_files/explain.slt:
##########
@@ -79,10 +79,7 @@ logical_plan
01)Limit: skip=0, fetch=10
02)--Sort: aggregate_test_100_with_order.c1 ASC NULLS LAST, fetch=10
03)----TableScan: aggregate_test_100_with_order projection=[c1]
-physical_plan
-01)GlobalLimitExec: skip=0, fetch=10
-02)--CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_test_100_order_by_c1_asc.csv]]},
projection=[c1], output_ordering=[c1@0 ASC NULLS LAST], has_header=true
-
+physical_plan CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_test_100_order_by_c1_asc.csv]]},
projection=[c1], limit=10, output_ordering=[c1@0 ASC NULLS LAST],
has_header=true
Review Comment:
The `GlobalLimitExec` is not needed here because the `CsvExec` already has a
limit and there is a single partition ✅
##########
datafusion/physical-plan/src/lib.rs:
##########
@@ -428,6 +428,18 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
fn statistics(&self) -> Result<Statistics> {
Ok(Statistics::new_unknown(&self.schema()))
}
+
+ /// Returns `true` if a limit can be safely pushed down through this
+ /// `ExecutionPlan` node.
+ fn supports_limit_pushdown(&self) -> bool {
+ false
+ }
+
+ /// Returns a fetching variant of this `ExecutionPlan` node, if it supports
Review Comment:
💯
##########
datafusion/core/src/physical_optimizer/limit_pushdown.rs:
##########
@@ -0,0 +1,661 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This rule reduces the amount of data transferred by pushing down limits as
much as possible.
Review Comment:
This is the summary that is rendered on the rustdocs page, so it helps to
put a link tot he actual struct
Something like
```suggestion
//! [`LimitPushdown`] pushes `LIMIT` down in `ExecutionPlan`s to reduce data
transfer
```
##########
datafusion/physical-plan/src/lib.rs:
##########
@@ -428,6 +428,18 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
fn statistics(&self) -> Result<Statistics> {
Ok(Statistics::new_unknown(&self.schema()))
}
+
+ /// Returns `true` if a limit can be safely pushed down through this
+ /// `ExecutionPlan` node.
Review Comment:
I think it might help to also explain the implications of returning true.
Something like
```suggestion
/// `ExecutionPlan` node.
///
/// If this method returns true, and the query contains a `FETCH`/`LIMIT`
/// at the output of this node, DataFusion will push the Fetch/Limit to
/// the input of this node
```
##########
datafusion/sqllogictest/test_files/explain.slt:
##########
@@ -304,9 +300,7 @@ CREATE EXTERNAL TABLE alltypes_plain STORED AS PARQUET
LOCATION '../../parquet-t
query TT
EXPLAIN SELECT * FROM alltypes_plain limit 10;
----
-physical_plan
-01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent,
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
-02)--ParquetExec: file_groups={1 group:
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]},
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10,
statistics=[Rows=Exact(8), Bytes=Absent,
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
+physical_plan ParquetExec: file_groups={1 group:
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]},
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10,
statistics=[Rows=Exact(8), Bytes=Absent,
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
Review Comment:
I agree this is a better plan 👍 as it has only a single partition the global
limit is unecessary
##########
datafusion/physical-plan/src/coalesce_batches.rs:
##########
@@ -160,6 +176,16 @@ impl ExecutionPlan for CoalesceBatchesExec {
fn statistics(&self) -> Result<Statistics> {
self.input.statistics()
}
+
+ fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn
ExecutionPlan>> {
Review Comment:
It might help to document somewhere (perhaps the struct's documentation) why
setting `limit` is valuable on `CoalesceBatchesExec ` -- in this case I think
it is because `CoalesceBatchesExec` buffers data so it may buffer more than
necessary waiting for a limit to complete
##########
datafusion/sqllogictest/test_files/group_by.slt:
##########
@@ -4334,8 +4335,9 @@ physical_plan
01)GlobalLimitExec: skip=0, fetch=5
02)--SortPreservingMergeExec: [name@0 DESC,time_chunks@1 DESC], fetch=5
03)----ProjectionExec: expr=[name@0 as name, date_bin(IntervalMonthDayNano {
months: 0, days: 0, nanoseconds: 900000000000 }, ts@1) as time_chunks]
-04)------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
-05)--------StreamingTableExec: partition_sizes=1, projection=[name, ts],
infinite_source=true, output_ordering=[name@0 DESC, ts@1 DESC]
+04)------LocalLimitExec: fetch=5
+05)--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
Review Comment:
I wonder why wasn't the limit pushed into the `RepartitionExec` here 🤔
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]