mustafasrepo commented on issue #9370: URL: https://github.com/apache/datafusion/issues/9370#issuecomment-2074192047
As you say `RepartitionExec::pull_from_input` is `async`. However, it opens a work for each input partition. Hence, when the plan contains `RepartitionExec: partitioning=Hash([exprs, ...], 8), input_partitions=1`. It will open a single job to consume from input. Then will calculate hash values for each row, then will row to appropriate output partition according to its hash value. Since hash calculation may be intensive doing this in single partition may be suboptimal. Hence instead of `RepartitionExec: partitioning=Hash([exprs, ...], 8), input_partitions=1` datafusion plans contain ``` RepartitionExec: partitioning=Hash([exprs, ...], 8), input_partitions=8 --RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 ``` where `RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1` consumes data from input (with a single job also, However, in this case there is no computation and works pretty fast.) Then re-routes them to output partitions. Then `RepartitionExec: partitioning=Hash([exprs, ...], 8), input_partitions=8` opens 8 `asyn pull_from_input` jobs for hash calculation. What we like to have is accomplishing this parallelism in `RepartitionExec: partitioning=Hash([exprs, ...], 8), input_partitions=1` (where input partition is 1.) I think one possible way to support this feature will be, adding a method to the `RepartitionExec` which - reads from input - routes read data to output channels (much like in `RoundRobin`). Its API might be something like `RepartitionExec::split_input_data(input: Arc<dyn ExecutionPlan>) -> Vec<SendableRecordBatchStream>` Then `pull_from_input` can work on result of this stage (instead of `input: Arc<dyn ExecutionPlan>`), it can receive `input_stream: SendableRecordBatchStream`. By this way, with a modification in `pull_from_input` and with a new stage before `pull_from_input`, I presume we can support this feature. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
