Hi everyone,
Currently BeamSQL does not support DISTINCT aggregations. These are queries
like:

> SELECT k, SUM(DISTINCT v) FROM t GROUP BY k
> SELECT k, k2, COUNT(DISTINCT k2) FROM t GROUP BY k, k2

These are represented in Calcite's logical plan with a distinct flag on
aggregation calls, but we ignore the flag when converting to a pipeline. I
see two different ways that we could support this:
1. Two GBKs - For any DISTINCT aggregation we do one GBK on the <GROUP BY
key> + <DISTINCT expr> to de-dupe values of expr followed by a second GBK
on just <GROUP BY key> to perform the aggregation.
2. Stateful CombineFn - We could implement a version of the combiners used
for SUM, COUNT, etc [1] that maintain some state that tracks previously
seen elements and uses it de-deupe new elements.

Of course, something like (1) is much more scalable, but it is also much
more challenging to implement. While (2) is trivial to implement, but runs
the risk of blowing up a worker's memory usage.

Personally, I think it could be worthwhile to provide support for DISTINCT
quickly with approach (2), and implement (1) as an optimization later. This
combiner's state would be partitioned by key and by window, so I think we
would be pretty safe from OOM'ing a worker except in some extreme cases
(long windows, hot keys, very large batch pipelines, ...).


But I understand this could be controversial, so I wanted to open it up for
discussion first: Would it be worthwhile to provide support for DISTINCT
aggregations quickly with approach #2, or is it better to just reject these
queries until we can implement them safely with approach #1?

Thanks,
Brian

[1]
https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L48

Reply via email to