my-vegetable-has-exploded commented on code in PR #12754: URL: https://github.com/apache/datafusion/pull/12754#discussion_r1798196952
########## datafusion/physical-plan/src/joins/ie_join.rs: ########## @@ -0,0 +1,1133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::collections::BTreeMap; +use std::fmt::Formatter; +use std::ops::Range; +use std::sync::Arc; +use std::task::Poll; + +use crate::joins::utils::{ + apply_join_filter_to_indices, build_batch_from_indices, build_join_schema, + check_inequality_condition, check_join_is_valid, estimate_join_statistics, + inequality_conditions_to_sort_exprs, is_loose_inequality_operator, ColumnIndex, + JoinFilter, OnceAsync, OnceFut, +}; +use crate::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; +use crate::sorts::sort::sort_batch; +use crate::{ + collect, execution_mode_from_children, DisplayAs, DisplayFormatType, Distribution, + ExecutionMode, ExecutionPlan, ExecutionPlanProperties, PlanProperties, + RecordBatchStream, SendableRecordBatchStream, +}; +use arrow::array::{make_comparator, AsArray, UInt64Builder}; + +use arrow::compute::concat; +use arrow::compute::kernels::sort::SortOptions; +use arrow::datatypes::{Int64Type, Schema, SchemaRef, UInt64Type}; +use arrow::record_batch::RecordBatch; +use arrow_array::{ArrayRef, Int64Array, UInt64Array}; +use datafusion_common::{plan_err, JoinSide, Result, Statistics}; +use datafusion_execution::TaskContext; +use datafusion_expr::{JoinType, Operator}; +use datafusion_physical_expr::equivalence::join_equivalence_properties; + +use datafusion_physical_expr::{Partitioning, PhysicalSortExpr, PhysicalSortRequirement}; +use futures::{ready, Stream}; +use parking_lot::Mutex; + +/// IEJoinExec is optimized join without any equijoin conditions in `ON` clause but with two or more inequality conditions. +/// For more detail algorithm, see <https://vldb.org/pvldb/vol8/p2074-khayyat.pdf> +/// +/// Take this query q as an example: +/// +/// SELECT t1.t id, t2.t id +/// FROM west t1, west t2 +/// WHERE t1.time < t2.time AND t1.cost < t2.cost +/// +/// There is no equijoin condition in the `ON` clause, but there are two inequality conditions. +/// Currently, left table is t1, right table is t2. +/// +/// The berif idea of this algorithm is converting it to ordered pair/inversion pair of permutation problem. For a permutation of a[0..n-1], for a pairs (i, j) such that i < j and a\[i\] < a\[j\], we call it an ordered pair of permutation. +/// +/// For example, for a[0..4] = [2, 1, 3, 0], there are 2 ordered pairs: (2, 3), (1, 3) +/// +/// To convert query q to ordered pair of permutation problem. We will do the following steps: +/// 1. Sort t1 union t2 by time in ascending order, mark the sorted table as l1. +/// 2. Sort t1 union t2 by cost in ascending order, mark the sorted table as l2. +/// 3. For each element e_i in l2, find the index j in l1 such that l1\[j\] = e_i, mark the computed index as permutation array p. If p\[i\] = j, it means that the ith element in l2 is the jth element in l1. +/// 4. Compute the ordered pair of permutation array p. For a pair (i, j) in l2, if i < j then e_i.cost < e_j.cost because l2 is sorted by cost in ascending order. And if p\[i\] < p\[j\], then e_i.time < e_j.time because l1 is sorted by time in ascending order. +/// 5. The result of query q is the pairs (i, j) in l2 such that i < j and p\[i\] < p\[j\] and e_i is from right table and e_j is from left table. +/// +/// To get the final result, we need to get all the pairs (i, j) in l2 such that i < j and p\[i\] < p\[j\] and e_i is from right table and e_j is from left table. We can do this by the following steps: +/// 1. Traverse l2 from left to right, at offset j, we can maintain BtreeSet or bitmap to record all the p\[i\] that i < j, then find all the pairs (i, j) in l2 such that p\[i\] < p\[j\]. +/// +/// To parallel the above algorithm, we can sort t1 and t2 by time (condition 1) firstly, and repartition the data into N partitions, then join t1\[i\] and t2\[j\] respectively. And if the minimum time of t1\[i\] is greater than the maximum time of t2\[j\], we can skip the join of t1\[i\] and t2\[j\] because there is no join result between them according to condition 1. +#[derive(Debug)] +pub struct IEJoinExec { + /// left side, which have been sorted by condition 1 + pub(crate) left: Arc<dyn ExecutionPlan>, + /// right side, which have been sorted by condition 1 + pub(crate) right: Arc<dyn ExecutionPlan>, + /// inequality conditions for iejoin, for example, t1.time > t2.time and t1.cost < t2.cost, only support two inequality conditions, other conditions will be stored in `filter` + pub(crate) inequality_conditions: Vec<JoinFilter>, + /// filters which are applied while finding matching rows + pub(crate) filter: Option<JoinFilter>, + /// how the join is performed + pub(crate) join_type: JoinType, + /// the schema once the join is applied + schema: SchemaRef, + /// data for iejoin + iejoin_data: OnceAsync<IEJoinData>, + /// left condition, it represents `t1.time asc` and `t1.cost asc` in above example + left_conditions: Arc<[PhysicalSortExpr; 2]>, + /// right condition, it represents `t2.time asc` and `t2.cost asc` in above example + right_conditions: Arc<[PhysicalSortExpr; 2]>, + /// operator of the inequality condition + operators: Arc<[Operator; 2]>, + /// sort options of the inequality conditions, it represents `asc` and `asc` in above example + sort_options: Arc<[SortOptions; 2]>, + /// partition pairs, used to get the next pair of left and right blocks, IEJoinStream handles one pair of blocks each time + pairs: Arc<Mutex<u64>>, + /// Information of index and left / right placement of columns + column_indices: Vec<ColumnIndex>, + // TODO: add memory reservation? + /// execution metrics + metrics: ExecutionPlanMetricsSet, + /// cache holding plan properties like equivalences, output partitioning etc. + cache: PlanProperties, +} + +impl IEJoinExec { + /// Try to create a new [`IEJoinExec`] + pub fn try_new( + left: Arc<dyn ExecutionPlan>, + right: Arc<dyn ExecutionPlan>, + inequality_conditions: Vec<JoinFilter>, + filter: Option<JoinFilter>, + join_type: &JoinType, + target_partitions: usize, + ) -> Result<Self> { + let left_schema = left.schema(); + let right_schema = right.schema(); + check_join_is_valid(&left_schema, &right_schema, &[])?; + let (schema, column_indices) = + build_join_schema(&left_schema, &right_schema, join_type); + if inequality_conditions.len() != 2 { + return plan_err!( + "IEJoinExec only supports two inequality conditions, got {}", + inequality_conditions.len() + ); + } + for condition in &inequality_conditions { + check_inequality_condition(condition)?; + } + let schema = Arc::new(schema); + if !matches!(join_type, JoinType::Inner) { + return plan_err!( + "IEJoinExec only supports inner join currently, got {}", + join_type + ); + } + let cache = Self::compute_properties( + &left, + &right, + Arc::clone(&schema), + *join_type, + target_partitions, + ); + let condition_parts = + inequality_conditions_to_sort_exprs(&inequality_conditions)?; + let left_conditions = + Arc::new([condition_parts[0].0.clone(), condition_parts[1].0.clone()]); + let right_conditions = + Arc::new([condition_parts[0].1.clone(), condition_parts[1].1.clone()]); + let operators = Arc::new([condition_parts[0].2, condition_parts[1].2]); + let sort_options = Arc::new([ + operator_to_sort_option(operators[0]), + operator_to_sort_option(operators[1]), + ]); + + Ok(IEJoinExec { + left, + right, + inequality_conditions, + filter, + join_type: *join_type, + schema, + iejoin_data: Default::default(), + left_conditions, + right_conditions, + operators, + sort_options, + pairs: Arc::new(Mutex::new(0)), + column_indices, + metrics: Default::default(), + cache, + }) + } + + /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. + fn compute_properties( + left: &Arc<dyn ExecutionPlan>, + right: &Arc<dyn ExecutionPlan>, + schema: SchemaRef, + join_type: JoinType, + target_partitions: usize, + ) -> PlanProperties { + // Calculate equivalence properties: + let eq_properties = join_equivalence_properties( + left.equivalence_properties().clone(), + right.equivalence_properties().clone(), + &join_type, + schema, + &[false, false], + None, + // No on columns in iejoin + &[], + ); + + let output_partitioning = Partitioning::UnknownPartitioning(target_partitions); + + // Determine execution mode + let mut mode = execution_mode_from_children([left, right]); + if mode.is_unbounded() { + mode = ExecutionMode::PipelineBreaking; + } + + PlanProperties::new(eq_properties, output_partitioning, mode) + } +} + +impl DisplayAs for IEJoinExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + let display_filter = self.filter.as_ref().map_or_else( + || "".to_string(), + |f| format!(", filter={}", f.expression()), + ); + let display_inequality_conditions = self + .inequality_conditions + .iter() + .map(|c| format!("({})", c.expression())) + .collect::<Vec<String>>() + .join(", "); + write!( + f, + "IEJoinExec: mode={:?}, join_type={:?}, inequality_conditions=[{}]{}", + self.cache.execution_mode, + self.join_type, + display_inequality_conditions, + display_filter, + ) + } + } + } +} + +/// convert operator to sort option for iejoin +/// for left.a <= right.b, the sort option is ascending order +/// for left.a >= right.b, the sort option is descending order +pub fn operator_to_sort_option(op: Operator) -> SortOptions { + match op { + Operator::Lt | Operator::LtEq => SortOptions { + descending: false, + nulls_first: false, + }, + Operator::Gt | Operator::GtEq => SortOptions { + descending: true, + nulls_first: false, + }, + _ => panic!("Unsupported operator"), + } +} + +impl ExecutionPlan for IEJoinExec { + fn name(&self) -> &'static str { + "IEJoinExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.cache + } + + fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { + vec![&self.left, &self.right] + } + + fn required_input_distribution(&self) -> Vec<Distribution> { + vec![Distribution::SinglePartition, Distribution::SinglePartition] Review Comment: The inputs will be sorted by condition1 firstly, so I required SinglePartition here. But not so sure about it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org