mustafasrepo commented on issue #9370:
URL: https://github.com/apache/datafusion/issues/9370#issuecomment-2074192047

   As you say `RepartitionExec::pull_from_input` is `async`. However, it opens 
a work for each input partition. Hence, when the plan contains 
`RepartitionExec: partitioning=Hash([exprs, ...], 8), input_partitions=1`. It 
will open a single job to consume from input. Then will calculate hash values 
for each row, then will row to appropriate output partition according to its 
hash value. Since hash calculation may be intensive doing this in single 
partition may be suboptimal. Hence instead of 
   `RepartitionExec: partitioning=Hash([exprs, ...], 8), input_partitions=1`
   datafusion plans contain
   ```
   RepartitionExec: partitioning=Hash([exprs, ...], 8), input_partitions=8
   --RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
   ```
   where `RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1` 
consumes data from input (with a single job also, However, in this case there 
is no computation and works pretty fast.) Then re-routes them to output 
partitions. Then `RepartitionExec: partitioning=Hash([exprs, ...], 8), 
input_partitions=8` opens 8 `asyn pull_from_input` jobs for hash calculation. 
What we like to have is accomplishing this parallelism in `RepartitionExec: 
partitioning=Hash([exprs, ...], 8), input_partitions=1` (where input partition 
is 1.)
   
   I think one possible way  to support this feature will be, adding a method 
to the `RepartitionExec` which
   - reads from input 
   - routes read data to output channels (much like in `RoundRobin`). 
   Its API might be something like `RepartitionExec::split_input_data(input: 
Arc<dyn ExecutionPlan>) -> Vec<SendableRecordBatchStream>`
   Then `pull_from_input` can work on result of this stage (instead of `input: 
Arc<dyn ExecutionPlan>`), it can receive `input_stream: 
SendableRecordBatchStream`. By this way, with a modification in 
`pull_from_input` and with a new stage before `pull_from_input`, I presume we 
can support this feature.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to