alamb commented on code in PR #11652:
URL: https://github.com/apache/datafusion/pull/11652#discussion_r1693622103


##########
datafusion/physical-plan/src/lib.rs:
##########
@@ -428,6 +428,18 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     fn statistics(&self) -> Result<Statistics> {
         Ok(Statistics::new_unknown(&self.schema()))
     }
+
+    /// Returns `true` if a limit can be safely pushed down through this
+    /// `ExecutionPlan` node.
+    fn supports_limit_pushdown(&self) -> bool {

Review Comment:
   I found this name somewhat confusing as it implied to me it was reporting if 
`with_fetch` was implemented, where now I see it refers to if it is ok to push 
a limit through the node
   
   Perhaps we could rename it to something different. Perhaps  `can_push_limit` 
or `preserves_limit` ? 
   
   I don't feel strongly about this



##########
datafusion/sqllogictest/test_files/explain.slt:
##########
@@ -79,10 +79,7 @@ logical_plan
 01)Limit: skip=0, fetch=10
 02)--Sort: aggregate_test_100_with_order.c1 ASC NULLS LAST, fetch=10
 03)----TableScan: aggregate_test_100_with_order projection=[c1]
-physical_plan
-01)GlobalLimitExec: skip=0, fetch=10
-02)--CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_test_100_order_by_c1_asc.csv]]},
 projection=[c1], output_ordering=[c1@0 ASC NULLS LAST], has_header=true
-
+physical_plan CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_test_100_order_by_c1_asc.csv]]},
 projection=[c1], limit=10, output_ordering=[c1@0 ASC NULLS LAST], 
has_header=true

Review Comment:
   The `GlobalLimitExec` is not needed here because the `CsvExec` already has a 
limit and there is a single partition ✅ 



##########
datafusion/physical-plan/src/lib.rs:
##########
@@ -428,6 +428,18 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     fn statistics(&self) -> Result<Statistics> {
         Ok(Statistics::new_unknown(&self.schema()))
     }
+
+    /// Returns `true` if a limit can be safely pushed down through this
+    /// `ExecutionPlan` node.
+    fn supports_limit_pushdown(&self) -> bool {
+        false
+    }
+
+    /// Returns a fetching variant of this `ExecutionPlan` node, if it supports

Review Comment:
   💯 



##########
datafusion/core/src/physical_optimizer/limit_pushdown.rs:
##########
@@ -0,0 +1,661 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This rule reduces the amount of data transferred by pushing down limits as 
much as possible.

Review Comment:
   This is the summary that is rendered on the rustdocs page, so it helps to 
put a link tot he actual struct
   
   Something like
   
   ```suggestion
   //! [`LimitPushdown`] pushes `LIMIT` down in `ExecutionPlan`s to reduce data 
transfer
   ```



##########
datafusion/physical-plan/src/lib.rs:
##########
@@ -428,6 +428,18 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     fn statistics(&self) -> Result<Statistics> {
         Ok(Statistics::new_unknown(&self.schema()))
     }
+
+    /// Returns `true` if a limit can be safely pushed down through this
+    /// `ExecutionPlan` node.

Review Comment:
   I think it might help to also explain the implications of returning true. 
Something like
   
   ```suggestion
       /// `ExecutionPlan` node.
       ///
       /// If this method returns true, and the query contains a `FETCH`/`LIMIT`
       /// at the output of this node, DataFusion will push the Fetch/Limit to 
       /// the input of this node
   ```



##########
datafusion/sqllogictest/test_files/explain.slt:
##########
@@ -304,9 +300,7 @@ CREATE EXTERNAL TABLE alltypes_plain STORED AS PARQUET 
LOCATION '../../parquet-t
 query TT
 EXPLAIN SELECT * FROM alltypes_plain limit 10;
 ----
-physical_plan
-01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, 
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
-02)--ParquetExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
statistics=[Rows=Exact(8), Bytes=Absent, 
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
+physical_plan ParquetExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
statistics=[Rows=Exact(8), Bytes=Absent, 
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]

Review Comment:
   I agree this is a better plan 👍 as it has only a single partition the global 
limit is unecessary



##########
datafusion/physical-plan/src/coalesce_batches.rs:
##########
@@ -160,6 +176,16 @@ impl ExecutionPlan for CoalesceBatchesExec {
     fn statistics(&self) -> Result<Statistics> {
         self.input.statistics()
     }
+
+    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn 
ExecutionPlan>> {

Review Comment:
   It might help to document somewhere (perhaps the struct's documentation) why 
setting `limit` is valuable on `CoalesceBatchesExec ` -- in this case I think 
it is because `CoalesceBatchesExec` buffers data so it may buffer more than 
necessary waiting for a limit to complete



##########
datafusion/sqllogictest/test_files/group_by.slt:
##########
@@ -4334,8 +4335,9 @@ physical_plan
 01)GlobalLimitExec: skip=0, fetch=5
 02)--SortPreservingMergeExec: [name@0 DESC,time_chunks@1 DESC], fetch=5
 03)----ProjectionExec: expr=[name@0 as name, date_bin(IntervalMonthDayNano { 
months: 0, days: 0, nanoseconds: 900000000000 }, ts@1) as time_chunks]
-04)------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
-05)--------StreamingTableExec: partition_sizes=1, projection=[name, ts], 
infinite_source=true, output_ordering=[name@0 DESC, ts@1 DESC]
+04)------LocalLimitExec: fetch=5
+05)--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1

Review Comment:
   I wonder why wasn't the limit pushed into the `RepartitionExec` here 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to