Hi everyone, Currently BeamSQL does not support DISTINCT aggregations. These are queries like:
> SELECT k, SUM(DISTINCT v) FROM t GROUP BY k > SELECT k, k2, COUNT(DISTINCT k2) FROM t GROUP BY k, k2 These are represented in Calcite's logical plan with a distinct flag on aggregation calls, but we ignore the flag when converting to a pipeline. I see two different ways that we could support this: 1. Two GBKs - For any DISTINCT aggregation we do one GBK on the <GROUP BY key> + <DISTINCT expr> to de-dupe values of expr followed by a second GBK on just <GROUP BY key> to perform the aggregation. 2. Stateful CombineFn - We could implement a version of the combiners used for SUM, COUNT, etc [1] that maintain some state that tracks previously seen elements and uses it de-deupe new elements. Of course, something like (1) is much more scalable, but it is also much more challenging to implement. While (2) is trivial to implement, but runs the risk of blowing up a worker's memory usage. Personally, I think it could be worthwhile to provide support for DISTINCT quickly with approach (2), and implement (1) as an optimization later. This combiner's state would be partitioned by key and by window, so I think we would be pretty safe from OOM'ing a worker except in some extreme cases (long windows, hot keys, very large batch pipelines, ...). But I understand this could be controversial, so I wanted to open it up for discussion first: Would it be worthwhile to provide support for DISTINCT aggregations quickly with approach #2, or is it better to just reject these queries until we can implement them safely with approach #1? Thanks, Brian [1] https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L48