berkaysynnada commented on code in PR #14207:
URL: https://github.com/apache/datafusion/pull/14207#discussion_r1966888998
##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -1021,21 +1043,25 @@ fn remove_dist_changing_operators(
fn replace_order_preserving_variants(
mut context: DistributionContext,
) -> Result<DistributionContext> {
- context.children = context
- .children
- .into_iter()
- .map(|child| {
- if child.data {
- replace_order_preserving_variants(child)
- } else {
- Ok(child)
- }
- })
- .collect::<Result<Vec<_>>>()?;
+ let mut children = vec![];
+ let mut fetch = None;
+ for child in context.children.into_iter() {
+ if child.data.has_dist_changing {
+ let mut child = replace_order_preserving_variants(child)?;
+ fetch = child.data.fetch.take();
+ children.push(child);
+ } else {
+ children.push(child);
+ }
+ }
+ context.children = children;
if is_sort_preserving_merge(&context.plan) {
+ // Keep the fetch value of the SortPreservingMerge operator, maybe it
will be used later.
+ let fetch = context.plan.fetch();
let child_plan = Arc::clone(&context.children[0].plan);
context.plan = Arc::new(CoalescePartitionsExec::new(child_plan));
+ context.data.fetch = fetch;
Review Comment:
Don't we need to set the fetch for all operators if they have a fetch ? If
it is so, let's do that before these if else blocks -- that will also eliminate
the line 1078
##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -3172,3 +3181,78 @@ fn optimize_away_unnecessary_repartition2() ->
Result<()> {
Ok(())
}
+
+#[tokio::test]
+async fn apply_enforce_distribution_multiple_times() -> Result<()> {
Review Comment:
Besides this test, a data test in .slt's would still be helpful IMO
##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -3154,3 +3164,104 @@ fn optimize_away_unnecessary_repartition2() ->
Result<()> {
Ok(())
}
+
+#[tokio::test]
+async fn apply_enforce_distribution_multiple_times() -> Result<()> {
+ // Create a configuration
+ let config = SessionConfig::new();
+ let ctx = SessionContext::new_with_config(config);
+ let testdata = datafusion::test_util::arrow_test_data();
+ let csv_file = format!("{testdata}/csv/aggregate_test_100.csv");
+ // Create table schema and data
+ let sql = format!(
+ "CREATE EXTERNAL TABLE aggregate_test_100 (
+ c1 VARCHAR NOT NULL,
+ c2 TINYINT NOT NULL,
+ c3 SMALLINT NOT NULL,
+ c4 SMALLINT,
+ c5 INT,
+ c6 BIGINT NOT NULL,
+ c7 SMALLINT NOT NULL,
+ c8 INT NOT NULL,
+ c9 BIGINT UNSIGNED NOT NULL,
+ c10 VARCHAR NOT NULL,
+ c11 FLOAT NOT NULL,
+ c12 DOUBLE NOT NULL,
+ c13 VARCHAR NOT NULL
+ )
+ STORED AS CSV
+ LOCATION '{csv_file}'
+ OPTIONS ('format.has_header' 'true')"
+ );
+
+ ctx.sql(sql.as_str()).await?;
+
+ let df = ctx.sql("SELECT * FROM(SELECT * FROM aggregate_test_100 UNION ALL
SELECT * FROM aggregate_test_100) ORDER BY c13 LIMIT 5").await?;
+ let logical_plan = df.logical_plan().clone();
+ let analyzed_logical_plan = ctx.state().analyzer().execute_and_check(
+ logical_plan,
+ ctx.state().config_options(),
+ |_, _| (),
+ )?;
+ let optimized_logical_plan = ctx.state().optimizer().optimize(
+ analyzed_logical_plan,
+ &ctx.state(),
+ |_, _| (),
+ )?;
+
+ let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
+ Arc::new(OutputRequirements::new_add_mode()),
+ Arc::new(EnforceDistribution::new()),
+ Arc::new(EnforceSorting::new()),
+ Arc::new(ProjectionPushdown::new()),
+ Arc::new(CoalesceBatches::new()),
+ Arc::new(EnforceDistribution::new()), // -- Add enforce distribution
rule again
+ // The second `EnforceDistribution` should be run before removing
`OutputRequirements` to reproduce the bug.
+ Arc::new(OutputRequirements::new_remove_mode()),
+ Arc::new(ProjectionPushdown::new()),
+ Arc::new(LimitPushdown::new()),
+ Arc::new(SanityCheckPlan::new()),
Review Comment:
https://github.com/apache/datafusion/pull/14207/files#r1959171412
If that's the case, why don't we just pass through the minimal reproducer
rules?
##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -987,16 +1003,22 @@ fn add_spm_on_top(input: DistributionContext) ->
DistributionContext {
fn remove_dist_changing_operators(
mut distribution_context: DistributionContext,
) -> Result<DistributionContext> {
+ let mut fetch = None;
while is_repartition(&distribution_context.plan)
|| is_coalesce_partitions(&distribution_context.plan)
|| is_sort_preserving_merge(&distribution_context.plan)
{
+ if is_sort_preserving_merge(&distribution_context.plan) {
Review Comment:
Why do we limit the fetch extraction with only sort preserving merge
operator?
##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -1361,22 +1390,67 @@ pub fn ensure_distribution(
} else {
plan.with_new_children(children_plans)?
};
+ let mut optimized_distribution_ctx =
+ DistributionContext::new(Arc::clone(&plan), data.clone(), children);
+
+ // If `fetch` was not consumed, it means that there was
`SortPreservingMergeExec` with fetch before
Review Comment:
Are we sure the fetch belongs to sort preserving merge? Even if it is so at
the moment, what will happen if there emerge more operators having fetch
capability?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]