timsaucer opened a new issue, #16740: URL: https://github.com/apache/datafusion/issues/16740
### Is your feature request related to a problem or challenge? I have a case where I have two table providers. They produce partitioned data with a partition hash. I want to be able to do efficient joins on these partition hashes. When the number of partitions of one table does not match the other, it generates an error during execution recommending the user add a `RepartitionExec`. This problem is not reproducible if the number of partitions is 14 or less since our optimizer will add a repartition. ### Describe the solution you'd like I can think of two ways to handle this. One is to identify during planning that that the partitions differ and to add a `RepartitionExec` into the plan to account for this. The other is to update `HashJoinExec` to identify when we have unmatched input partitions and to identify the correct number of output partitions and the mapping from inputs to output. ### Describe alternatives you've considered The only alternative I can think of is to add repartitions into the plan manually, but this does not seem like something each of our users should have to do. ### Additional context Here is a minimal reproducible example in 48.0.0. When you run this code you will get the error `Error: Internal("Invalid HashJoinExec, partition count mismatch 14!=15,consider using RepartitionExec")` ```rust use arrow::array::{ ArrayRef, RecordBatch, UInt64Array, }; use arrow_schema::SchemaRef; use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::catalog::{Session, TableProvider}; use datafusion::datasource::TableType; use datafusion::error::Result; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; use datafusion::physical_expr::EquivalenceProperties; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; use datafusion::prelude::*; use std::any::Any; use std::fmt::Formatter; use std::sync::Arc; use async_trait::async_trait; use datafusion::common::exec_err; #[derive(Debug)] struct MyTableProvider { num_partitions: usize, } #[async_trait] impl TableProvider for MyTableProvider { fn as_any(&self) -> &dyn Any { self } fn schema(&self) -> SchemaRef { MyExecPlan::schema() } fn table_type(&self) -> TableType { TableType::Base } async fn scan( &self, _state: &dyn Session, _projection: Option<&Vec<usize>>, _filters: &[Expr], _limit: Option<usize>, ) -> Result<Arc<dyn ExecutionPlan>> { Ok(Arc::new(MyExecPlan::new(self.num_partitions))) } } #[derive(Debug)] struct MyExecPlan { props: PlanProperties, num_partitions: usize, } impl MyExecPlan { pub fn new(num_partitions: usize) -> Self { let schema = Self::schema(); let partition_col = datafusion::physical_expr::expressions::col("a", schema.as_ref()).unwrap(); Self { props: PlanProperties::new( EquivalenceProperties::new(Self::schema()), datafusion::physical_expr::Partitioning::Hash( vec![partition_col], num_partitions, ), EmissionType::Final, Boundedness::Bounded, ), num_partitions } } pub fn schema() -> Arc<Schema> { Arc::new(Schema::new(vec![ Field::new("a", DataType::UInt64, false), Field::new("b", DataType::UInt64, false), ])) } } impl ExecutionPlan for MyExecPlan { fn name(&self) -> &str { "MyExec" } fn as_any(&self) -> &dyn Any { self } fn properties(&self) -> &PlanProperties { &self.props } fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { vec![] } fn with_new_children( self: Arc<Self>, _children: Vec<Arc<dyn ExecutionPlan>>, ) -> Result<Arc<dyn ExecutionPlan>> { Ok(self) } fn execute( &self, partition: usize, _context: Arc<TaskContext>, ) -> Result<SendableRecordBatchStream> { if partition >= self.num_partitions { return exec_err!("Invalid partition number {}", partition); } let a: ArrayRef = Arc::new(UInt64Array::from(vec![partition as u64; 5])); let b: ArrayRef = Arc::new(UInt64Array::from((0..5).collect::<Vec<_>>())); let batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b)])?; let schema = batch.schema(); let stream = futures::stream::iter(vec![Ok(batch)]); let adapter = RecordBatchStreamAdapter::new(schema, stream); Ok(Box::pin(adapter)) } } impl DisplayAs for MyExecPlan { fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { f.write_str("MyExec") } } #[tokio::main] async fn main() -> Result<()> { let ctx = SessionContext::new(); let table_provider_1 = Arc::new(MyTableProvider { num_partitions: 14 }); let table_provider_2 = Arc::new(MyTableProvider { num_partitions: 15 }); let _ = ctx.register_table("t1", table_provider_1)?; let _ = ctx.register_table("t2", table_provider_2)?; let df1 = ctx.table("t1").await?; let df2 = ctx.table("t2").await?; let df = df1.join(df2, JoinType::Inner, &["a"], &["a"], None)?; df.show().await?; Ok(()) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org