alamb commented on issue #16707:
URL: https://github.com/apache/datafusion/issues/16707#issuecomment-3048476870

   ```shell
   python3 -m venv test_venv
   source test_venv/bin/activate
   pip install duckdb datafusion numpy
   python3 repro.py
   ```
   
   
   
   I see the output
   ```
   duckdb      : 246.51ms
   datafusion  : 931.72ms
   ```
   
   One thing I noticed is that only 1 CPU core is used
   
   I could not seem to get a useful profile (the stack traces I had didn't show 
a stack frame)
   
   Profiling command
   ```shell
   samply record  python3 repro.py
   ```
   
   <img width="2339" height="1176" alt="Image" 
src="https://github.com/user-attachments/assets/61dab740-6048-4907-b56a-b8d41ff429c9";
 />
   
   
   Here is the query
   ```sql
   select name, sum(value) as value FROM pa_table group by name;
   ```
   
   Here is the explain plan 
   
   ```
   
+---------------+------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                              |
   
+---------------+------------------------------------------------------------------------------------------+
   | logical_plan  | Projection: pa_table.name, sum(pa_table.value) AS value    
                              |
   |               |   Aggregate: groupBy=[[pa_table.name]], 
aggr=[[sum(pa_table.value)]]                     |
   |               |     TableScan: pa_table projection=[name, value]           
                              |
   | physical_plan | ProjectionExec: expr=[name@0 as name, 
sum(pa_table.value)@1 as value]                    |
   |               |   AggregateExec: mode=FinalPartitioned, gby=[name@0 as 
name], aggr=[sum(pa_table.value)] |
   |               |     CoalesceBatchesExec: target_batch_size=8192            
                              |
   |               |       RepartitionExec: partitioning=Hash([name@0], 16), 
input_partitions=16              |
   |               |         AggregateExec: mode=Partial, gby=[name@0 as name], 
aggr=[sum(pa_table.value)]    |
   |               |           RepartitionExec: 
partitioning=RoundRobinBatch(16), input_partitions=1          |
   |               |             DataSourceExec: partitions=1, 
partition_sizes=[1]                            |
   |               |                                                            
                              |
   
+---------------+------------------------------------------------------------------------------------------+
   ```
   
   Here is the `explain(analyze=True)`:
   ```
   
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type         | plan                                                   
                                                                                
                                                                                
                              |
   
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | Plan with Metrics | ProjectionExec: expr=[name@0 as name, 
sum(pa_table.value)@1 as value], metrics=[output_rows=676, 
elapsed_compute=5.041µs]                                                        
                                                                    |
   |                   |   AggregateExec: mode=FinalPartitioned, gby=[name@0 as 
name], aggr=[sum(pa_table.value)], metrics=[output_rows=676, 
elapsed_compute=460.916µs, spill_count=0, spilled_bytes=0, spilled_rows=0, 
peak_mem_used=159424]                                 |
   |                   |     CoalesceBatchesExec: target_batch_size=8192, 
metrics=[output_rows=676, elapsed_compute=29.718µs]                             
                                                                                
                                    |
   |                   |       RepartitionExec: partitioning=Hash([name@0], 
16), input_partitions=16, metrics=[fetch_time=1.057575333s, 
repartition_time=41.56µs, send_time=25.826µs]                                   
                                                      |
   |                   |         AggregateExec: mode=Partial, gby=[name@0 as 
name], aggr=[sum(pa_table.value)], metrics=[output_rows=676, 
elapsed_compute=1.056854918s, spill_count=0, spilled_bytes=0, spilled_rows=0, 
skipped_aggregation_rows=0, peak_mem_used=1614708316] |
   |                   |           RepartitionExec: 
partitioning=RoundRobinBatch(16), input_partitions=1, 
metrics=[fetch_time=6.542µs, repartition_time=1ns, send_time=6.181µs]           
                                                                                
    |
   |                   |             DataSourceExec: partitions=1, 
partition_sizes=[1], metrics=[]                                                 
                                                                                
                                           |
   |                   |                                                        
                                                                                
                                                                                
                              |
   
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   datafusion  : 910.99ms
   ```
   
   Note that the first AggregateExec takes most of the time: 
`elapsed_compute=1.056854918s`
   
   
   ## Conclusion
   This plan shows the proper repartitioning (by 16) so I would expect it to 
keep all 16 cores busy.
   
   However, only a single core was busy, so I conclude the bottleneck is in the 
input DataSource -- it seems to me that it can't feed the data fast enough


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to