EmilyMatt commented on issue #1389: URL: https://github.com/apache/datafusion-comet/issues/1389#issuecomment-2661320829
The issue can be dissected as this: a. There is no reason to have a Partial aggregate and not a Final one, regardless of shuffle, if we support the aggregate expressions, why should we run the Final stage in Spark if it is supported? we should get the performance benefit of running the final aggregate in native as well, and if something can run in Spark and the conversion has to be made, it should be only the result expressions. This is the less critical aspect. b. Currently, all the supported aggregate expressions are "naive", as in, they result in some sort of Spark primitive, and they are performed using a primitive aggregate, even ones such as Avg are represented as "Sum" and "Count", but what happens when Spark uses an aggregate buffer that it passes between stages? such as in the case of CollectSet and CollectList. Despite the expected result probably being an ArrayType(StringType, false), for example, Spark will pass the serialized intermediate representation between stages, which can be observed as the DataType is BinaryType(the attribute is usually called "buf#<id>" or something). When we reach something like this, using columnar shuffle, the shuffle will throw an error, because we will output an unexpected intermediate type(ArrayType of non-nullable string), while it expects a BinaryType. Let's say we fixed this by iterating over the outputs in CometHashAggregateExec, and correcting the DataType to what Comet outputs in the previous stage, now the shuffle will work correctly, as it will call the correct method on the ColumnarBatch and write it to the shuffle file. However, the Final stage HashAggregate, will still attempt to read the aggregate buffer it expects(binary), not an intermediate value of the result type. This will cause a crash then. This cannot be expected before stage materialization as the plan doesn't exist yet and we don't know if the result expressions are not supported, so I think allowing a Spark Final stage with a Comet partial stage is slightly shortsighted, unless in the future we write a conversion(maybe using codegen) between the Comet output and a Spark aggregate binary representation. I think a better solution is the separation of the result expressions in cases where they are not supported. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org