EmilyMatt opened a new issue, #1389:
URL: https://github.com/apache/datafusion-comet/issues/1389

   ### Describe the bug
   
   In cases where we support a HashAggregate's aggregate functions, we will 
convert the partial stage HashAggregate, execute it in DF, then use native 
shuffle to forward the results, however, the next stage will not materialize 
due to AQE waiting for the shuffle and its stats.
   Then, if the agg() function contained unsupported expressions wrapping the 
aggregates themselves, we cannot convert the Final mode.
   This causes the Spark HashAggregate to crash as it attempts to access fields 
that don't exist, and even if they do it expects its own aggregate buffer 
representation, which we don't have.
   
   I believe I've seen a few tests that are ignored because of this.
   I don't think this is a valid situation, We should not crash based on 
previously Comet-ran operators if they were successful.
   
   ### Steps to reproduce
   
   Have a group_by/aggregate that either needs a shuffle or aggregate buffer
   Like
   .agg(
     collect_set(col("my_column"))
   )
   
   but wrap that collect_set with an unsupported expression or cast or something
   (I am not sure, but I believe I saw something about a simliar behaviour that 
can be created using Decimal avg, as the Partial aggregate is supported but 
generates a Sum and Count, but generating results from the intermediate data 
does is not supported natively)
   
   For example:
   .agg(
     concat(flatten(collect_set(col("my_column"))))
   ) 
   can create this behaviour with AQE on.
   not sure if this is datatype related.
   
   
   
   ### Expected behavior
   
   Comet should either not convert a Partial HashAggregate whose Final stage 
cannot be converted.
   Or if it already did, should elegantly execute the Final aggregations and 
let Spark finish the work without breaking the plan.
   
   ### Additional context
   
   I believe I have a non-invasive solution where if the result expressions are 
not supported, we convert them into a separate ProjectExec, which will be the 
parent of the CometHashAggregateExec, which will not have result expressions 
(Like the Partial stage doesn't), and will have the grouping+aggregate 
attributes as its output.
   We then have a conversion to rows and run a ProjectExec with the unsupported 
expression, ensuring that even if the rest of the stage cannot be run using 
Comet, we don't break an already running workflow.
   
   Will open a PR shortly


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to