timsaucer opened a new issue, #16740:
URL: https://github.com/apache/datafusion/issues/16740

   ### Is your feature request related to a problem or challenge?
   
   I have a case where I have two table providers. They produce partitioned 
data with a partition hash. I want to be able to do efficient joins on these 
partition hashes. When the number of partitions of one table does not match the 
other, it generates an error during execution recommending the user add a 
`RepartitionExec`.
   
   This problem is not reproducible if the number of partitions is 14 or less 
since our optimizer will add a repartition.
   
   ### Describe the solution you'd like
   
   I can think of two ways to handle this. One is to identify during planning 
that that the partitions differ and to add a `RepartitionExec` into the plan to 
account for this. The other is to update `HashJoinExec` to identify when we 
have unmatched input partitions and to identify the correct number of output 
partitions and the mapping from inputs to output.
   
   ### Describe alternatives you've considered
   
   The only alternative I can think of is to add repartitions into the plan 
manually, but this does not seem like something each of our users should have 
to do.
   
   ### Additional context
   
   Here is a minimal reproducible example in 48.0.0. When you run this code you 
will get the error `Error: Internal("Invalid HashJoinExec, partition count 
mismatch 14!=15,consider using RepartitionExec")`
   
   
   ```rust
   use arrow::array::{
       ArrayRef, RecordBatch, UInt64Array,
   };
   use arrow_schema::SchemaRef;
   use datafusion::arrow::datatypes::{DataType, Field, Schema};
   use datafusion::catalog::{Session, TableProvider};
   use datafusion::datasource::TableType;
   use datafusion::error::Result;
   use datafusion::execution::{SendableRecordBatchStream, TaskContext};
   use datafusion::physical_expr::EquivalenceProperties;
   use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
   use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
   use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, 
PlanProperties};
   use datafusion::prelude::*;
   use std::any::Any;
   use std::fmt::Formatter;
   use std::sync::Arc;
   use async_trait::async_trait;
   use datafusion::common::exec_err;
   
   #[derive(Debug)]
   struct MyTableProvider {
       num_partitions: usize,
   }
   
   #[async_trait]
   impl TableProvider for MyTableProvider {
       fn as_any(&self) -> &dyn Any {
           self
       }
   
       fn schema(&self) -> SchemaRef {
           MyExecPlan::schema()
       }
   
       fn table_type(&self) -> TableType {
           TableType::Base
       }
   
       async fn scan(
           &self,
           _state: &dyn Session,
           _projection: Option<&Vec<usize>>,
           _filters: &[Expr],
           _limit: Option<usize>,
       ) -> Result<Arc<dyn ExecutionPlan>> {
           Ok(Arc::new(MyExecPlan::new(self.num_partitions)))
       }
   }
   
   #[derive(Debug)]
   struct MyExecPlan {
       props: PlanProperties,
       num_partitions: usize,
   }
   
   impl MyExecPlan {
       pub fn new(num_partitions: usize) -> Self {
           let schema = Self::schema();
           let partition_col =
               datafusion::physical_expr::expressions::col("a", 
schema.as_ref()).unwrap();
           Self {
               props: PlanProperties::new(
                   EquivalenceProperties::new(Self::schema()),
                   datafusion::physical_expr::Partitioning::Hash(
                       vec![partition_col],
                       num_partitions,
                   ),
                   EmissionType::Final,
                   Boundedness::Bounded,
               ),
               num_partitions
           }
       }
   
       pub fn schema() -> Arc<Schema> {
           Arc::new(Schema::new(vec![
               Field::new("a", DataType::UInt64, false),
               Field::new("b", DataType::UInt64, false),
           ]))
       }
   }
   
   impl ExecutionPlan for MyExecPlan {
       fn name(&self) -> &str {
           "MyExec"
       }
   
       fn as_any(&self) -> &dyn Any {
           self
       }
   
       fn properties(&self) -> &PlanProperties {
           &self.props
       }
   
       fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
           vec![]
       }
   
       fn with_new_children(
           self: Arc<Self>,
           _children: Vec<Arc<dyn ExecutionPlan>>,
       ) -> Result<Arc<dyn ExecutionPlan>> {
           Ok(self)
       }
   
       fn execute(
           &self,
           partition: usize,
           _context: Arc<TaskContext>,
       ) -> Result<SendableRecordBatchStream> {
           if partition >= self.num_partitions {
               return exec_err!("Invalid partition number {}", partition);
           }
   
           let a: ArrayRef = Arc::new(UInt64Array::from(vec![partition as u64; 
5]));
           let b: ArrayRef = 
Arc::new(UInt64Array::from((0..5).collect::<Vec<_>>()));
   
           let batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b)])?;
           let schema = batch.schema();
           let stream = futures::stream::iter(vec![Ok(batch)]);
           let adapter = RecordBatchStreamAdapter::new(schema, stream);
           Ok(Box::pin(adapter))
       }
   }
   
   impl DisplayAs for MyExecPlan {
       fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result {
           f.write_str("MyExec")
       }
   }
   
   #[tokio::main]
   async fn main() -> Result<()> {
       let ctx = SessionContext::new();
   
       let table_provider_1 = Arc::new(MyTableProvider { num_partitions: 14 });
       let table_provider_2 = Arc::new(MyTableProvider { num_partitions: 15 });
   
       let _ = ctx.register_table("t1", table_provider_1)?;
       let _ = ctx.register_table("t2", table_provider_2)?;
   
       let df1 = ctx.table("t1").await?;
       let df2 = ctx.table("t2").await?;
   
       let df = df1.join(df2, JoinType::Inner, &["a"], &["a"], None)?;
   
       df.show().await?;
   
       Ok(())
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to