jonathanc-n commented on code in PR #16023:
URL: https://github.com/apache/datafusion/pull/16023#discussion_r2150948098


##########
datafusion/optimizer/src/eliminate_self_join/unique_keyed.rs:
##########
@@ -0,0 +1,323 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`EliminateUniqueKeyedSelfJoin`] eliminates self joins on unique 
constraint columns
+
+use crate::{ApplyOrder, OptimizerConfig, OptimizerRule};
+use datafusion_common::{
+    tree_node::{Transformed, TreeNode, TreeNodeRecursion},
+    DFSchema, Result, TableReference,
+};
+use datafusion_expr::{
+    Expr, Join, JoinType, LogicalPlan, LogicalPlanBuilder, Projection, 
SubqueryAlias,
+    TableScan,
+};
+use indexmap::IndexSet;
+
+use super::{
+    is_table_scan_same, merge_table_scans, unique_indexes, OptimizationResult,
+    RenamedAlias,
+};
+
+#[derive(Default, Debug)]
+pub struct EliminateUniqueKeyedSelfJoin;
+
+impl EliminateUniqueKeyedSelfJoin {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+/// Optimize self-join query by combining LHS and RHS of the join. Current 
implementation is
+/// very conservative. It only merges nodes if one of them is `TableScan`. It 
should be possible
+/// to merge projections and filters together as well.
+///
+/// TLDR; of current implementation is
+/// - If LHS and RHS is `TableScan`, then merge table scans,
+/// - If LHS is `TableScan` and RHS isn't `TableScan`, then find `TableScan` 
on RHS and merge them
+/// - If LHS isn't `TableScan` and RHS is `TableScan` recursively call 
`optimize` with children swapped
+/// - If LHS and RHS is `SubqueryAlias`, recursively call `optimize` with 
their input
+fn optimize(left: &LogicalPlan, right: &LogicalPlan) -> 
Option<OptimizationResult> {
+    match (left, right) {
+        (LogicalPlan::TableScan(left_scan), 
LogicalPlan::TableScan(right_scan)) => {
+            let table_scan = merge_table_scans(left_scan, right_scan);
+            let plan = LogicalPlan::TableScan(table_scan)
+                .recompute_schema()
+                .unwrap();
+            Some(OptimizationResult {
+                plan,
+                renamed_alias: None,
+            })
+        }
+        (
+            LogicalPlan::SubqueryAlias(SubqueryAlias {
+                input: left_input,
+                alias: left_alias,
+                ..
+            }),
+            LogicalPlan::SubqueryAlias(SubqueryAlias {
+                input: right_input,
+                alias: right_alias,
+                ..
+            }),
+        ) => {
+            let OptimizationResult {
+                plan,
+                renamed_alias,
+            } = optimize(left_input, right_input)?;
+            assert!(renamed_alias.is_none(), "Assert `renamed_alias` is `None` 
because nested `SubqueryAlias` shouldn't be possible");
+
+            let plan = LogicalPlanBuilder::new(plan)
+                .alias(left_alias.clone())
+                .unwrap()
+                .build()
+                .unwrap();
+            let plan = plan.recompute_schema().unwrap();
+            Some(OptimizationResult {
+                plan,
+                renamed_alias: Some(RenamedAlias {
+                    from: right_alias.clone(),
+                    to: left_alias.clone(),
+                }),
+            })
+        }
+        (LogicalPlan::TableScan(left_scan), _) => {
+            let transformed = right
+                .clone()
+                .transform_up(|plan| match &plan {
+                    LogicalPlan::TableScan(right_scan) => {
+                        let merged = merge_table_scans(left_scan, right_scan);
+                        Ok(Transformed::yes(LogicalPlan::TableScan(merged)))
+                    }
+                    _ => Ok(Transformed::no(plan)),
+                })
+                .unwrap();
+            assert!(

Review Comment:
   ditto



##########
datafusion/optimizer/src/eliminate_self_join/unique_keyed.rs:
##########
@@ -0,0 +1,323 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`EliminateUniqueKeyedSelfJoin`] eliminates self joins on unique 
constraint columns
+
+use crate::{ApplyOrder, OptimizerConfig, OptimizerRule};
+use datafusion_common::{
+    tree_node::{Transformed, TreeNode, TreeNodeRecursion},
+    DFSchema, Result, TableReference,
+};
+use datafusion_expr::{
+    Expr, Join, JoinType, LogicalPlan, LogicalPlanBuilder, Projection, 
SubqueryAlias,
+    TableScan,
+};
+use indexmap::IndexSet;
+
+use super::{
+    is_table_scan_same, merge_table_scans, unique_indexes, OptimizationResult,
+    RenamedAlias,
+};
+
+#[derive(Default, Debug)]
+pub struct EliminateUniqueKeyedSelfJoin;
+
+impl EliminateUniqueKeyedSelfJoin {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+/// Optimize self-join query by combining LHS and RHS of the join. Current 
implementation is
+/// very conservative. It only merges nodes if one of them is `TableScan`. It 
should be possible
+/// to merge projections and filters together as well.
+///
+/// TLDR; of current implementation is
+/// - If LHS and RHS is `TableScan`, then merge table scans,
+/// - If LHS is `TableScan` and RHS isn't `TableScan`, then find `TableScan` 
on RHS and merge them
+/// - If LHS isn't `TableScan` and RHS is `TableScan` recursively call 
`optimize` with children swapped
+/// - If LHS and RHS is `SubqueryAlias`, recursively call `optimize` with 
their input
+fn optimize(left: &LogicalPlan, right: &LogicalPlan) -> 
Option<OptimizationResult> {
+    match (left, right) {
+        (LogicalPlan::TableScan(left_scan), 
LogicalPlan::TableScan(right_scan)) => {
+            let table_scan = merge_table_scans(left_scan, right_scan);
+            let plan = LogicalPlan::TableScan(table_scan)
+                .recompute_schema()
+                .unwrap();
+            Some(OptimizationResult {
+                plan,
+                renamed_alias: None,
+            })
+        }
+        (
+            LogicalPlan::SubqueryAlias(SubqueryAlias {
+                input: left_input,
+                alias: left_alias,
+                ..
+            }),
+            LogicalPlan::SubqueryAlias(SubqueryAlias {
+                input: right_input,
+                alias: right_alias,
+                ..
+            }),
+        ) => {
+            let OptimizationResult {
+                plan,
+                renamed_alias,
+            } = optimize(left_input, right_input)?;
+            assert!(renamed_alias.is_none(), "Assert `renamed_alias` is `None` 
because nested `SubqueryAlias` shouldn't be possible");

Review Comment:
   Is there a reason we assert instead of propagating error?



##########
datafusion/optimizer/src/eliminate_self_join/unique_keyed.rs:
##########
@@ -0,0 +1,323 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`EliminateUniqueKeyedSelfJoin`] eliminates self joins on unique 
constraint columns
+
+use crate::{ApplyOrder, OptimizerConfig, OptimizerRule};
+use datafusion_common::{
+    tree_node::{Transformed, TreeNode, TreeNodeRecursion},
+    DFSchema, Result, TableReference,
+};
+use datafusion_expr::{
+    Expr, Join, JoinType, LogicalPlan, LogicalPlanBuilder, Projection, 
SubqueryAlias,
+    TableScan,
+};
+use indexmap::IndexSet;
+
+use super::{
+    is_table_scan_same, merge_table_scans, unique_indexes, OptimizationResult,
+    RenamedAlias,
+};
+
+#[derive(Default, Debug)]
+pub struct EliminateUniqueKeyedSelfJoin;
+
+impl EliminateUniqueKeyedSelfJoin {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+/// Optimize self-join query by combining LHS and RHS of the join. Current 
implementation is
+/// very conservative. It only merges nodes if one of them is `TableScan`. It 
should be possible
+/// to merge projections and filters together as well.
+///
+/// TLDR; of current implementation is
+/// - If LHS and RHS is `TableScan`, then merge table scans,
+/// - If LHS is `TableScan` and RHS isn't `TableScan`, then find `TableScan` 
on RHS and merge them
+/// - If LHS isn't `TableScan` and RHS is `TableScan` recursively call 
`optimize` with children swapped
+/// - If LHS and RHS is `SubqueryAlias`, recursively call `optimize` with 
their input
+fn optimize(left: &LogicalPlan, right: &LogicalPlan) -> 
Option<OptimizationResult> {
+    match (left, right) {
+        (LogicalPlan::TableScan(left_scan), 
LogicalPlan::TableScan(right_scan)) => {
+            let table_scan = merge_table_scans(left_scan, right_scan);
+            let plan = LogicalPlan::TableScan(table_scan)
+                .recompute_schema()
+                .unwrap();
+            Some(OptimizationResult {
+                plan,
+                renamed_alias: None,
+            })
+        }
+        (
+            LogicalPlan::SubqueryAlias(SubqueryAlias {
+                input: left_input,
+                alias: left_alias,
+                ..
+            }),
+            LogicalPlan::SubqueryAlias(SubqueryAlias {
+                input: right_input,
+                alias: right_alias,
+                ..
+            }),
+        ) => {
+            let OptimizationResult {
+                plan,
+                renamed_alias,
+            } = optimize(left_input, right_input)?;
+            assert!(renamed_alias.is_none(), "Assert `renamed_alias` is `None` 
because nested `SubqueryAlias` shouldn't be possible");
+
+            let plan = LogicalPlanBuilder::new(plan)
+                .alias(left_alias.clone())
+                .unwrap()
+                .build()
+                .unwrap();
+            let plan = plan.recompute_schema().unwrap();
+            Some(OptimizationResult {
+                plan,
+                renamed_alias: Some(RenamedAlias {
+                    from: right_alias.clone(),
+                    to: left_alias.clone(),
+                }),
+            })
+        }
+        (LogicalPlan::TableScan(left_scan), _) => {
+            let transformed = right
+                .clone()
+                .transform_up(|plan| match &plan {
+                    LogicalPlan::TableScan(right_scan) => {
+                        let merged = merge_table_scans(left_scan, right_scan);
+                        Ok(Transformed::yes(LogicalPlan::TableScan(merged)))
+                    }
+                    _ => Ok(Transformed::no(plan)),
+                })
+                .unwrap();
+            assert!(
+                transformed.transformed,
+                "Called `transform_up` and no merged `TableScan`"
+            );
+            if transformed.transformed {
+                Some(OptimizationResult {
+                    plan: transformed.data,
+                    renamed_alias: None,
+                })
+            } else {
+                None
+            }
+        }
+        (_, LogicalPlan::TableScan(_)) => optimize(right, left),
+        _ => None,
+    }
+}
+
+fn try_resolve_join_on_columns_to_indexes(
+    join: &Join,
+    schema: &DFSchema,
+    source: &TableReference,
+    left_alias: Option<&TableReference>,
+    right_alias: Option<&TableReference>,
+) -> Option<IndexSet<usize>> {
+    let length = join.on.len();
+    let mut on_idx = IndexSet::with_capacity(length);
+
+    for on in &join.on {
+        let (left_col, right_col) = match on {
+            (Expr::Column(left_col), Expr::Column(right_col)) => (left_col, 
right_col),
+            _ => return None,
+        };
+        let source_ref = Some(source);
+
+        // If LHS column's alias isn't LHS's alias or table name then bail
+        let left_ref = left_col.relation.as_ref();

Review Comment:
   Right here, relation is returning Some(relation), if this is `None` then we 
dont want that returning when both `left_ref != left_alias && left_ref != 
source_ref` evaluate to true. I believe it should only check against the alias 
and source if it is not None (unqualified references should be allowed).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to