ozankabak commented on code in PR #11652:
URL: https://github.com/apache/datafusion/pull/11652#discussion_r1692893313
##########
datafusion/core/src/physical_optimizer/limit_pushdown.rs:
##########
@@ -166,37 +154,111 @@ fn extract_limit(plan: &Arc<dyn ExecutionPlan>) ->
Option<LimitExec> {
}
}
-/// Merge the limits of the parent and the child. The resulting
[`ExecutionPlan`]
-/// is the same as the parent.
-fn try_merge_limits(
- limit_exec: &LimitExec,
- limit: &LocalLimitExec,
-) -> Result<Arc<dyn ExecutionPlan>> {
- let parent_skip = limit_exec.skip();
- let parent_fetch = limit_exec.fetch();
- let parent_max = parent_fetch.map(|f| f + parent_skip);
- let child_fetch = limit.fetch();
-
- if let Some(parent_max) = parent_max {
- if child_fetch >= parent_max {
- // Child fetch is larger than or equal to parent max, so we can
remove the child
- Ok(limit_exec.with_child(limit.input().clone()).into())
- } else if child_fetch > parent_skip {
- // Child fetch is larger than parent skip, so we can trim the
parent fetch
- Ok(limit_exec
- .with_child(limit.input().clone())
- .with_fetch(child_fetch - parent_skip)
- .into())
- } else {
- // This would return an empty result
- Err(plan_datafusion_err!("Child fetch is less than parent skip"))
+/// Merge the limits of the parent and the child. If at least one of them is a
+/// [`GlobalLimitExec`], the result is also a [`GlobalLimitExec`]. Otherwise,
+/// the result is a [`LocalLimitExec`].
+fn merge_limits(
+ parent_limit_exec: &LimitExec,
+ child_limit_exec: &LimitExec,
+) -> Arc<dyn ExecutionPlan> {
+ match (parent_limit_exec, child_limit_exec) {
+ (LimitExec::Local(parent), LimitExec::Local(child)) => {
+ // Simply use the smaller fetch
+ Arc::new(LocalLimitExec::new(
+ child_limit_exec.input().clone(),
+ parent.fetch().min(child.fetch()),
+ ))
+ }
+ _ => {
+ // We can use the same logic as push_down_limit from logical plan
optimizer
+ let (skip, fetch) = combine_limit(
+ parent_limit_exec.skip(),
+ parent_limit_exec.fetch(),
+ child_limit_exec.skip(),
+ child_limit_exec.fetch(),
+ );
+
+ Arc::new(GlobalLimitExec::new(
+ child_limit_exec.input().clone(),
+ skip,
+ fetch,
+ ))
}
- } else {
- // Parent's fetch is infinite, use child's
- Ok(limit_exec.with_fetch(child_fetch).into())
}
}
+/// Combines two limits into a single
+///
+/// Returns the combined limit `(skip, fetch)`
+///
+/// # Case 0: Parent and Child are disjoint. (`child_fetch <= skip`)
+///
+/// ```text
+/// Before merging:
+/// |........skip........|---fetch-->| Parent
Limit
+/// |...child_skip...|---child_fetch-->| Child
Limit
+/// ```
+///
+/// After merging:
+/// ```text
+/// |.........(child_skip + skip).........|
+/// ```
+///
+/// Before merging:
+/// ```text
+/// |...skip...|------------fetch------------>| Parent
Limit
+/// |...child_skip...|-------------child_fetch------------>| Child
Limit
+/// ```
+///
+/// After merging:
+/// ```text
+/// |....(child_skip + skip)....|---(child_fetch - skip)-->|
+/// ```
+///
+/// # Case 1: Parent is beyond the range of Child. (`skip < child_fetch <=
skip + fetch`)
+///
+/// Before merging:
+/// ```text
+/// |...skip...|------------fetch------------>| Parent
Limit
+/// |...child_skip...|-------------child_fetch------------>| Child
Limit
+/// ```
+///
+/// After merging:
+/// ```text
+/// |....(child_skip + skip)....|---(child_fetch - skip)-->|
+/// ```
+///
+/// # Case 2: Parent is in the range of Child. (`skip + fetch < child_fetch`)
+/// Before merging:
+/// ```text
+/// |...skip...|---fetch-->| Parent
Limit
+/// |...child_skip...|-------------child_fetch------------>| Child
Limit
+/// ```
+///
+/// After merging:
+/// ```text
+/// |....(child_skip + skip)....|---fetch-->|
+/// ```
Review Comment:
```suggestion
/// Computes the `skip` and `fetch` parameters of a single limit that would
be
/// equivalent to two consecutive limits with the given `skip`/`fetch`
parameters.
///
/// There are multiple cases to consider:
///
/// # Case 0: Parent and child are disjoint (`child_fetch <= skip`).
///
/// ```text
/// Before merging:
/// |........skip........|---fetch-->| Parent limit
/// |...child_skip...|---child_fetch-->| Child limit
/// ```
///
/// After merging:
/// ```text
/// |.........(child_skip + skip).........|
/// ```
///
/// # Case 1: Parent is beyond child's range (`skip < child_fetch <= skip +
fetch`).
///
/// Before merging:
/// ```text
/// |...skip...|------------fetch------------>| Parent
limit
/// |...child_skip...|-------------child_fetch------------>| Child
limit
/// ```
///
/// After merging:
/// ```text
/// |....(child_skip + skip)....|---(child_fetch - skip)-->|
/// ```
///
/// # Case 2: Parent is within child's range (`skip + fetch < child_fetch`).
///
/// Before merging:
/// ```text
/// |...skip...|---fetch-->| Parent
limit
/// |...child_skip...|-------------child_fetch------------>| Child
limit
/// ```
///
/// After merging:
/// ```text
/// |....(child_skip + skip)....|---fetch-->|
/// ```
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]