comphead commented on PR #14160:
URL: https://github.com/apache/datafusion/pull/14160#issuecomment-2605200055

   > > > BatchCoalescer
   > > 
   > > 
   > > Thanks @berkaysynnada for your feedback, if I got you right, you prefer 
to call the `CoalesceBatchesExec` just AFTER the `SortMergeJoinExec` called in 
physical planner like for other join types?
   > > I checked some tests in `joins.rs` and looks like the `CoalesceBatches` 
called before the join
   > > ```
   > >     let expected = {
   > >         [
   > >             "SymmetricHashJoinExec: mode=Partitioned, join_type=Full, 
on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND 
CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10",
   > >             "  CoalesceBatchesExec: target_batch_size=8192",
   > >             "    RepartitionExec: partitioning=Hash([a2@1], 8), 
input_partitions=8, preserve_order=true, sort_exprs=a1@0 ASC NULLS LAST",
   > >             "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
   > >             // "     CsvExec: file_groups={1 group: 
[[tempdir/left.csv]]}, projection=[a1, a2], has_header=false",
   > >             "  CoalesceBatchesExec: target_batch_size=8192",
   > >             "    RepartitionExec: partitioning=Hash([a2@1], 8), 
input_partitions=8, preserve_order=true, sort_exprs=a1@0 ASC NULLS LAST",
   > >             "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
   > >             // "     CsvExec: file_groups={1 group: 
[[tempdir/right.csv]]}, projection=[a1, a2], has_header=false"
   > >         ]
   > >     };
   > > ```
   > > 
   > > 
   > >     
   > >       
   > >     
   > > 
   > >       
   > >     
   > > 
   > >     
   > >   
   > > perhaps I'm missing something?
   > 
   > CoalesceBatches' in your example exist because of hash repartitions 
(CoalesceBatches rule adds a CoalesceBatchesExec on top of FilterExec, 
HashJoinExec, and hash-repartition).
   > 
   > I've thought about this, and I believe the most optimal solution is to 
make all join operators capable of performing both coalescing and splitting in 
a built-in manner. This is because the output of a join can either be smaller 
or larger than the target batch size. Ideally, there should be no need (or only 
minimal need) for CoalesceBatchesExec.
   > 
   > To achieve this built-in coalescing and splitting, we can leverage 
existing tools like BatchSplitter and BatchCoalescer (although there are no 
current examples of BatchCoalescer being used in joins). My suggestion is to 
generalize these tools so they can be utilized by any operator and applied 
wherever this mechanism is needed. As this pattern becomes more common, it will 
be easier to expand its usage and simplify its application.
   
   Thanks @berkaysynnada. Builtin options probably can be implemented with the 
sending a `BatchCoalescer` into the join instead of writing the custom code 
like in this implementation. 
   
   WDYT if we merge this PR to fix the bug for now and I start a discussion to 
unify coalesce/split approaches for the joins?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to