jonathanc-n commented on code in PR #16023:
URL: https://github.com/apache/datafusion/pull/16023#discussion_r2150926283


##########
datafusion/optimizer/src/eliminate_self_join/unique_keyed.rs:
##########
@@ -0,0 +1,323 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`EliminateUniqueKeyedSelfJoin`] eliminates self joins on unique 
constraint columns
+
+use crate::{ApplyOrder, OptimizerConfig, OptimizerRule};
+use datafusion_common::{
+    tree_node::{Transformed, TreeNode, TreeNodeRecursion},
+    DFSchema, Result, TableReference,
+};
+use datafusion_expr::{
+    Expr, Join, JoinType, LogicalPlan, LogicalPlanBuilder, Projection, 
SubqueryAlias,
+    TableScan,
+};
+use indexmap::IndexSet;
+
+use super::{
+    is_table_scan_same, merge_table_scans, unique_indexes, OptimizationResult,
+    RenamedAlias,
+};
+
+#[derive(Default, Debug)]
+pub struct EliminateUniqueKeyedSelfJoin;
+
+impl EliminateUniqueKeyedSelfJoin {

Review Comment:
   I think renaming this to `EliminateUniqueKeySelfJoin` sounds cleaner, just a 
nit though.



##########
datafusion/optimizer/src/eliminate_self_join/unique_keyed.rs:
##########
@@ -0,0 +1,323 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`EliminateUniqueKeyedSelfJoin`] eliminates self joins on unique 
constraint columns
+
+use crate::{ApplyOrder, OptimizerConfig, OptimizerRule};
+use datafusion_common::{
+    tree_node::{Transformed, TreeNode, TreeNodeRecursion},
+    DFSchema, Result, TableReference,
+};
+use datafusion_expr::{
+    Expr, Join, JoinType, LogicalPlan, LogicalPlanBuilder, Projection, 
SubqueryAlias,
+    TableScan,
+};
+use indexmap::IndexSet;
+
+use super::{
+    is_table_scan_same, merge_table_scans, unique_indexes, OptimizationResult,
+    RenamedAlias,
+};
+
+#[derive(Default, Debug)]
+pub struct EliminateUniqueKeyedSelfJoin;
+
+impl EliminateUniqueKeyedSelfJoin {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+/// Optimize self-join query by combining LHS and RHS of the join. Current 
implementation is
+/// very conservative. It only merges nodes if one of them is `TableScan`. It 
should be possible
+/// to merge projections and filters together as well.
+///
+/// TLDR; of current implementation is
+/// - If LHS and RHS is `TableScan`, then merge table scans,
+/// - If LHS is `TableScan` and RHS isn't `TableScan`, then find `TableScan` 
on RHS and merge them
+/// - If LHS isn't `TableScan` and RHS is `TableScan` recursively call 
`optimize` with children swapped
+/// - If LHS and RHS is `SubqueryAlias`, recursively call `optimize` with 
their input
+fn optimize(left: &LogicalPlan, right: &LogicalPlan) -> 
Option<OptimizationResult> {
+    match (left, right) {
+        (LogicalPlan::TableScan(left_scan), 
LogicalPlan::TableScan(right_scan)) => {
+            let table_scan = merge_table_scans(left_scan, right_scan);
+            let plan = LogicalPlan::TableScan(table_scan)
+                .recompute_schema()
+                .unwrap();
+            Some(OptimizationResult {
+                plan,
+                renamed_alias: None,
+            })
+        }
+        (
+            LogicalPlan::SubqueryAlias(SubqueryAlias {
+                input: left_input,
+                alias: left_alias,
+                ..
+            }),
+            LogicalPlan::SubqueryAlias(SubqueryAlias {
+                input: right_input,
+                alias: right_alias,
+                ..
+            }),
+        ) => {
+            let OptimizationResult {
+                plan,
+                renamed_alias,
+            } = optimize(left_input, right_input)?;
+            assert!(renamed_alias.is_none(), "Assert `renamed_alias` is `None` 
because nested `SubqueryAlias` shouldn't be possible");
+
+            let plan = LogicalPlanBuilder::new(plan)
+                .alias(left_alias.clone())
+                .unwrap()
+                .build()
+                .unwrap();
+            let plan = plan.recompute_schema().unwrap();
+            Some(OptimizationResult {
+                plan,
+                renamed_alias: Some(RenamedAlias {
+                    from: right_alias.clone(),
+                    to: left_alias.clone(),
+                }),
+            })
+        }
+        (LogicalPlan::TableScan(left_scan), _) => {
+            let transformed = right
+                .clone()
+                .transform_up(|plan| match &plan {
+                    LogicalPlan::TableScan(right_scan) => {
+                        let merged = merge_table_scans(left_scan, right_scan);
+                        Ok(Transformed::yes(LogicalPlan::TableScan(merged)))
+                    }
+                    _ => Ok(Transformed::no(plan)),
+                })
+                .unwrap();
+            assert!(
+                transformed.transformed,
+                "Called `transform_up` and no merged `TableScan`"
+            );
+            if transformed.transformed {
+                Some(OptimizationResult {
+                    plan: transformed.data,
+                    renamed_alias: None,
+                })
+            } else {
+                None
+            }
+        }
+        (_, LogicalPlan::TableScan(_)) => optimize(right, left),
+        _ => None,
+    }
+}
+
+fn try_resolve_join_on_columns_to_indexes(
+    join: &Join,
+    schema: &DFSchema,
+    source: &TableReference,
+    left_alias: Option<&TableReference>,
+    right_alias: Option<&TableReference>,
+) -> Option<IndexSet<usize>> {
+    let length = join.on.len();
+    let mut on_idx = IndexSet::with_capacity(length);
+
+    for on in &join.on {
+        let (left_col, right_col) = match on {
+            (Expr::Column(left_col), Expr::Column(right_col)) => (left_col, 
right_col),
+            _ => return None,
+        };
+        let source_ref = Some(source);
+
+        // If LHS column's alias isn't LHS's alias or table name then bail
+        let left_ref = left_col.relation.as_ref();
+        if left_ref != left_alias && left_ref != source_ref {
+            return None;
+        }
+        // If RHS column's alias isn't RHS's alias or table name then bail
+        let right_ref = right_col.relation.as_ref();
+        if right_ref != right_alias && right_ref != source_ref {
+            return None;
+        }
+
+        // It's safe to resolve column's without their qualifiers as we know 
source `TableSource` are the same.
+        let left_idx = schema.index_of_column_by_name(None, left_col.name())?;
+        let right_idx = schema.index_of_column_by_name(None, 
right_col.name())?;
+
+        // If LHS and RHS are different then optimization is impossible
+        if left_idx != right_idx {
+            return None;
+        }
+        on_idx.insert(left_idx);
+    }
+    Some(on_idx)
+}
+
+#[derive(Debug)]
+struct Resolution {
+    /// Source `TableScan`
+    table_scan: TableScan,
+    /// Column indexes into `TableScan` that form a unique index
+    alias: Option<TableReference>,
+}
+
+fn try_resolve_to_table_scan_alias(branch: &LogicalPlan) -> Option<Resolution> 
{

Review Comment:
   I don't think recursing to the deepest scan the correct behaviour we want 
here. For example if there was a left scan that was semantically equal to the 
right scan except that the right scan was split into two different scans. I 
think this should be an OptimizerRule in itself thought 🤔 . 
   
   A very simple example, if you were to UNION two scans on the same tables it 
will not combine this into one scan
   ```
   explain (SELECT id FROM users) UNION ALL (SELECT birthday FROM users);
   
+---------------+------------------------------------------------------------+
   | plan_type     | plan                                                       
|
   
+---------------+------------------------------------------------------------+
   | physical_plan | ┌───────────────────────────┐                              
|
   |               | │         UnionExec         ├──────────────┐               
|
   |               | └─────────────┬─────────────┘              │               
|
   |               | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ 
|
   |               | │       DataSourceExec      ││       ProjectionExec      │ 
|
   |               | │    --------------------   ││    --------------------   │ 
|
   |               | │          bytes: 0         ││        id: birthday       │ 
|
   |               | │       format: memory      ││                           │ 
|
   |               | │          rows: 0          ││                           │ 
|
   |               | └───────────────────────────┘└─────────────┬─────────────┘ 
|
   |               |                              ┌─────────────┴─────────────┐ 
|
   |               |                              │       DataSourceExec      │ 
|
   |               |                              │    --------------------   │ 
|
   |               |                              │          bytes: 0         │ 
|
   |               |                              │       format: memory      │ 
|
   |               |                              │          rows: 0          │ 
|
   |               |                              └───────────────────────────┘ 
|
   |               |                                                            
|
   
+---------------+------------------------------------------------------------+
   ```
   I do believe the current implementation is enough to handle this as it will 
just fail when comparing against the join indexes, but just trying to get your 
thoughts?



##########
datafusion/optimizer/src/eliminate_self_join/unique_keyed.rs:
##########
@@ -0,0 +1,323 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`EliminateUniqueKeyedSelfJoin`] eliminates self joins on unique 
constraint columns
+
+use crate::{ApplyOrder, OptimizerConfig, OptimizerRule};
+use datafusion_common::{
+    tree_node::{Transformed, TreeNode, TreeNodeRecursion},
+    DFSchema, Result, TableReference,
+};
+use datafusion_expr::{
+    Expr, Join, JoinType, LogicalPlan, LogicalPlanBuilder, Projection, 
SubqueryAlias,
+    TableScan,
+};
+use indexmap::IndexSet;
+
+use super::{
+    is_table_scan_same, merge_table_scans, unique_indexes, OptimizationResult,
+    RenamedAlias,
+};
+
+#[derive(Default, Debug)]
+pub struct EliminateUniqueKeyedSelfJoin;
+
+impl EliminateUniqueKeyedSelfJoin {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+/// Optimize self-join query by combining LHS and RHS of the join. Current 
implementation is
+/// very conservative. It only merges nodes if one of them is `TableScan`. It 
should be possible

Review Comment:
   Lets make an issue for this?



##########
datafusion/optimizer/src/eliminate_self_join/mod.rs:
##########
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use datafusion_common::{
+    tree_node::{Transformed, TreeNode},
+    Column, DFSchema, Dependency, Result, TableReference,
+};
+use datafusion_expr::{expr::Alias, Expr, LogicalPlan, TableScan};
+
+mod aggregation;
+mod unique_keyed;
+
+pub use aggregation::EliminateAggregationSelfJoin;
+use indexmap::IndexSet;
+pub use unique_keyed::EliminateUniqueKeyedSelfJoin;
+
+fn merge_table_scans(left_scan: &TableScan, right_scan: &TableScan) -> 
TableScan {
+    let filters = left_scan
+        .filters
+        .iter()
+        .chain(right_scan.filters.iter())
+        .cloned()
+        .collect();
+    // FIXME: double iteration over the filters
+    let projection = match (&left_scan.projection, &right_scan.projection) {
+        (Some(left_projection), Some(right_projection)) => Some(
+            left_projection
+                .iter()
+                .chain(right_projection.iter())
+                .cloned()
+                .collect::<IndexSet<_>>()
+                .into_iter()
+                .collect::<Vec<_>>(),
+        ),
+        (Some(left_projection), None) => Some(left_projection.clone()),
+        (None, Some(right_projection)) => Some(right_projection.clone()),
+        (None, None) => None,
+    };
+    let fetch = match (left_scan.fetch, right_scan.fetch) {
+        (Some(left_fetch), Some(right_fetch)) => 
Some(left_fetch.max(right_fetch)),
+        (Some(rows), None) | (None, Some(rows)) => Some(rows),
+        (None, None) => None,
+    };
+    TableScan::try_new(
+        left_scan.table_name.clone(),
+        Arc::clone(&left_scan.source),
+        projection,
+        filters,
+        fetch,
+    )
+    .unwrap()
+}
+
+// TODO: equality of `inner` `apachearrow::datatypes::SchemaRef` doesn't mean 
equality of the tables
+fn is_table_scan_same(left: &TableScan, right: &TableScan) -> bool {

Review Comment:
   Should we also compare against, filter, projection, and source?



##########
datafusion/optimizer/src/eliminate_self_join/mod.rs:
##########
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use datafusion_common::{
+    tree_node::{Transformed, TreeNode},
+    Column, DFSchema, Dependency, Result, TableReference,
+};
+use datafusion_expr::{expr::Alias, Expr, LogicalPlan, TableScan};
+
+mod aggregation;
+mod unique_keyed;
+
+pub use aggregation::EliminateAggregationSelfJoin;
+use indexmap::IndexSet;
+pub use unique_keyed::EliminateUniqueKeyedSelfJoin;
+
+fn merge_table_scans(left_scan: &TableScan, right_scan: &TableScan) -> 
TableScan {
+    let filters = left_scan
+        .filters
+        .iter()
+        .chain(right_scan.filters.iter())
+        .cloned()
+        .collect();
+    // FIXME: double iteration over the filters
+    let projection = match (&left_scan.projection, &right_scan.projection) {
+        (Some(left_projection), Some(right_projection)) => Some(
+            left_projection
+                .iter()
+                .chain(right_projection.iter())
+                .cloned()
+                .collect::<IndexSet<_>>()
+                .into_iter()
+                .collect::<Vec<_>>(),
+        ),
+        (Some(left_projection), None) => Some(left_projection.clone()),
+        (None, Some(right_projection)) => Some(right_projection.clone()),
+        (None, None) => None,
+    };
+    let fetch = match (left_scan.fetch, right_scan.fetch) {
+        (Some(left_fetch), Some(right_fetch)) => 
Some(left_fetch.max(right_fetch)),
+        (Some(rows), None) | (None, Some(rows)) => Some(rows),
+        (None, None) => None,
+    };
+    TableScan::try_new(
+        left_scan.table_name.clone(),
+        Arc::clone(&left_scan.source),
+        projection,
+        filters,
+        fetch,
+    )
+    .unwrap()

Review Comment:
   We should return result here instead of unwrapping



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to